4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <libcfs/libcfs.h>
37 #include <lprocfs_status.h>
38 #include <lustre_debug.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_ha.h>
42 #include <uapi/linux/lustre/lustre_ioctl.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <uapi/linux/lustre/lustre_param.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
51 #include "osc_internal.h"
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 struct osc_brw_async_args {
67 struct brw_page **aa_ppga;
68 struct client_obd *aa_cli;
69 struct list_head aa_oaps;
70 struct list_head aa_exts;
73 #define osc_grant_args osc_brw_async_args
75 struct osc_setattr_args {
77 obd_enqueue_update_f sa_upcall;
81 struct osc_fsync_args {
82 struct osc_object *fa_obj;
84 obd_enqueue_update_f fa_upcall;
88 struct osc_ladvise_args {
90 obd_enqueue_update_f la_upcall;
94 struct osc_enqueue_args {
95 struct obd_export *oa_exp;
96 enum ldlm_type oa_type;
97 enum ldlm_mode oa_mode;
99 osc_enqueue_upcall_f oa_upcall;
101 struct ost_lvb *oa_lvb;
102 struct lustre_handle oa_lockh;
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
112 struct ost_body *body;
114 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123 struct ptlrpc_request *req;
124 struct ost_body *body;
128 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
132 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
134 ptlrpc_request_free(req);
138 osc_pack_req_body(req, oa);
140 ptlrpc_request_set_replen(req);
142 rc = ptlrpc_queue_wait(req);
146 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
148 GOTO(out, rc = -EPROTO);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
153 oa->o_blksize = cli_brw_size(exp->exp_obd);
154 oa->o_valid |= OBD_MD_FLBLKSZ;
158 ptlrpc_req_finished(req);
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166 struct ptlrpc_request *req;
167 struct ost_body *body;
171 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
173 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
177 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
179 ptlrpc_request_free(req);
183 osc_pack_req_body(req, oa);
185 ptlrpc_request_set_replen(req);
187 rc = ptlrpc_queue_wait(req);
191 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193 GOTO(out, rc = -EPROTO);
195 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
199 ptlrpc_req_finished(req);
204 static int osc_setattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_setattr_args *sa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 GOTO(out, rc = -EPROTO);
218 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221 rc = sa->sa_upcall(sa->sa_cookie, rc);
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226 obd_enqueue_update_f upcall, void *cookie,
227 struct ptlrpc_request_set *rqset)
229 struct ptlrpc_request *req;
230 struct osc_setattr_args *sa;
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oa);
247 ptlrpc_request_set_replen(req);
249 /* do mds to ost setattr asynchronously */
251 /* Do not wait for response. */
252 ptlrpcd_add_req(req);
254 req->rq_interpret_reply =
255 (ptlrpc_interpterer_t)osc_setattr_interpret;
257 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258 sa = ptlrpc_req_async_args(req);
260 sa->sa_upcall = upcall;
261 sa->sa_cookie = cookie;
263 if (rqset == PTLRPCD_SET)
264 ptlrpcd_add_req(req);
266 ptlrpc_set_add_req(rqset, req);
272 static int osc_ladvise_interpret(const struct lu_env *env,
273 struct ptlrpc_request *req,
276 struct osc_ladvise_args *la = arg;
277 struct ost_body *body;
283 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
285 GOTO(out, rc = -EPROTO);
287 *la->la_oa = body->oa;
289 rc = la->la_upcall(la->la_cookie, rc);
294 * If rqset is NULL, do not wait for response. Upcall and cookie could also
295 * be NULL in this case
297 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
298 struct ladvise_hdr *ladvise_hdr,
299 obd_enqueue_update_f upcall, void *cookie,
300 struct ptlrpc_request_set *rqset)
302 struct ptlrpc_request *req;
303 struct ost_body *body;
304 struct osc_ladvise_args *la;
306 struct lu_ladvise *req_ladvise;
307 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
308 int num_advise = ladvise_hdr->lah_count;
309 struct ladvise_hdr *req_ladvise_hdr;
312 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
316 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
317 num_advise * sizeof(*ladvise));
318 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
320 ptlrpc_request_free(req);
323 req->rq_request_portal = OST_IO_PORTAL;
324 ptlrpc_at_set_req_timeout(req);
326 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
328 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
331 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
332 &RMF_OST_LADVISE_HDR);
333 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
335 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
336 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
337 ptlrpc_request_set_replen(req);
340 /* Do not wait for response. */
341 ptlrpcd_add_req(req);
345 req->rq_interpret_reply = osc_ladvise_interpret;
346 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
347 la = ptlrpc_req_async_args(req);
349 la->la_upcall = upcall;
350 la->la_cookie = cookie;
352 if (rqset == PTLRPCD_SET)
353 ptlrpcd_add_req(req);
355 ptlrpc_set_add_req(rqset, req);
360 static int osc_create(const struct lu_env *env, struct obd_export *exp,
363 struct ptlrpc_request *req;
364 struct ost_body *body;
369 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
370 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
372 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
374 GOTO(out, rc = -ENOMEM);
376 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
378 ptlrpc_request_free(req);
382 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
385 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
387 ptlrpc_request_set_replen(req);
389 rc = ptlrpc_queue_wait(req);
393 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
395 GOTO(out_req, rc = -EPROTO);
397 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
398 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
400 oa->o_blksize = cli_brw_size(exp->exp_obd);
401 oa->o_valid |= OBD_MD_FLBLKSZ;
403 CDEBUG(D_HA, "transno: %lld\n",
404 lustre_msg_get_transno(req->rq_repmsg));
406 ptlrpc_req_finished(req);
411 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
412 obd_enqueue_update_f upcall, void *cookie,
413 struct ptlrpc_request_set *rqset)
415 struct ptlrpc_request *req;
416 struct osc_setattr_args *sa;
417 struct ost_body *body;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
425 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
427 ptlrpc_request_free(req);
430 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
431 ptlrpc_at_set_req_timeout(req);
433 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
435 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
437 ptlrpc_request_set_replen(req);
439 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
440 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
441 sa = ptlrpc_req_async_args(req);
443 sa->sa_upcall = upcall;
444 sa->sa_cookie = cookie;
445 if (rqset == PTLRPCD_SET)
446 ptlrpcd_add_req(req);
448 ptlrpc_set_add_req(rqset, req);
453 static int osc_sync_interpret(const struct lu_env *env,
454 struct ptlrpc_request *req,
457 struct osc_fsync_args *fa = arg;
458 struct ost_body *body;
459 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
460 unsigned long valid = 0;
461 struct cl_object *obj;
467 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
469 CERROR("can't unpack ost_body\n");
470 GOTO(out, rc = -EPROTO);
473 *fa->fa_oa = body->oa;
474 obj = osc2cl(fa->fa_obj);
476 /* Update osc object's blocks attribute */
477 cl_object_attr_lock(obj);
478 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
479 attr->cat_blocks = body->oa.o_blocks;
484 cl_object_attr_update(env, obj, attr, valid);
485 cl_object_attr_unlock(obj);
488 rc = fa->fa_upcall(fa->fa_cookie, rc);
492 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
493 obd_enqueue_update_f upcall, void *cookie,
494 struct ptlrpc_request_set *rqset)
496 struct obd_export *exp = osc_export(obj);
497 struct ptlrpc_request *req;
498 struct ost_body *body;
499 struct osc_fsync_args *fa;
503 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
507 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
509 ptlrpc_request_free(req);
513 /* overload the size and blocks fields in the oa with start/end */
514 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
516 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
518 ptlrpc_request_set_replen(req);
519 req->rq_interpret_reply = osc_sync_interpret;
521 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
522 fa = ptlrpc_req_async_args(req);
525 fa->fa_upcall = upcall;
526 fa->fa_cookie = cookie;
528 if (rqset == PTLRPCD_SET)
529 ptlrpcd_add_req(req);
531 ptlrpc_set_add_req(rqset, req);
536 /* Find and cancel locally locks matched by @mode in the resource found by
537 * @objid. Found locks are added into @cancel list. Returns the amount of
538 * locks added to @cancels list. */
539 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
540 struct list_head *cancels,
541 enum ldlm_mode mode, __u64 lock_flags)
543 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
544 struct ldlm_res_id res_id;
545 struct ldlm_resource *res;
549 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
550 * export) but disabled through procfs (flag in NS).
552 * This distinguishes from a case when ELC is not supported originally,
553 * when we still want to cancel locks in advance and just cancel them
554 * locally, without sending any RPC. */
555 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
558 ostid_build_res_name(&oa->o_oi, &res_id);
559 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
563 LDLM_RESOURCE_ADDREF(res);
564 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565 lock_flags, 0, NULL);
566 LDLM_RESOURCE_DELREF(res);
567 ldlm_resource_putref(res);
571 static int osc_destroy_interpret(const struct lu_env *env,
572 struct ptlrpc_request *req, void *data,
575 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
577 atomic_dec(&cli->cl_destroy_in_flight);
578 wake_up(&cli->cl_destroy_waitq);
582 static int osc_can_send_destroy(struct client_obd *cli)
584 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
585 cli->cl_max_rpcs_in_flight) {
586 /* The destroy request can be sent */
589 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
590 cli->cl_max_rpcs_in_flight) {
592 * The counter has been modified between the two atomic
595 wake_up(&cli->cl_destroy_waitq);
600 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
603 struct client_obd *cli = &exp->exp_obd->u.cli;
604 struct ptlrpc_request *req;
605 struct ost_body *body;
606 struct list_head cancels = LIST_HEAD_INIT(cancels);
611 CDEBUG(D_INFO, "oa NULL\n");
615 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
616 LDLM_FL_DISCARD_DATA);
618 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
620 ldlm_lock_list_put(&cancels, l_bl_ast, count);
624 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
627 ptlrpc_request_free(req);
631 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
632 ptlrpc_at_set_req_timeout(req);
634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
638 ptlrpc_request_set_replen(req);
640 req->rq_interpret_reply = osc_destroy_interpret;
641 if (!osc_can_send_destroy(cli)) {
642 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
645 * Wait until the number of on-going destroy RPCs drops
646 * under max_rpc_in_flight
648 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
649 osc_can_send_destroy(cli), &lwi);
651 ptlrpc_req_finished(req);
656 /* Do not wait for response */
657 ptlrpcd_add_req(req);
661 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
664 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
666 LASSERT(!(oa->o_valid & bits));
669 spin_lock(&cli->cl_loi_list_lock);
670 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
671 oa->o_dirty = cli->cl_dirty_grant;
673 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
674 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
675 cli->cl_dirty_max_pages)) {
676 CERROR("dirty %lu - %lu > dirty_max %lu\n",
677 cli->cl_dirty_pages, cli->cl_dirty_transit,
678 cli->cl_dirty_max_pages);
680 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
681 atomic_long_read(&obd_dirty_transit_pages) >
682 (long)(obd_max_dirty_pages + 1))) {
683 /* The atomic_read() allowing the atomic_inc() are
684 * not covered by a lock thus they may safely race and trip
685 * this CERROR() unless we add in a small fudge factor (+1). */
686 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
687 cli_name(cli), atomic_long_read(&obd_dirty_pages),
688 atomic_long_read(&obd_dirty_transit_pages),
689 obd_max_dirty_pages);
691 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
693 CERROR("dirty %lu - dirty_max %lu too big???\n",
694 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
697 unsigned long nrpages;
699 nrpages = cli->cl_max_pages_per_rpc;
700 nrpages *= cli->cl_max_rpcs_in_flight + 1;
701 nrpages = max(nrpages, cli->cl_dirty_max_pages);
702 oa->o_undirty = nrpages << PAGE_SHIFT;
703 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
707 /* take extent tax into account when asking for more
709 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
710 cli->cl_max_extent_pages;
711 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
714 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
715 oa->o_dropped = cli->cl_lost_grant;
716 cli->cl_lost_grant = 0;
717 spin_unlock(&cli->cl_loi_list_lock);
718 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
719 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
722 void osc_update_next_shrink(struct client_obd *cli)
724 cli->cl_next_shrink_grant = ktime_get_seconds() +
725 cli->cl_grant_shrink_interval;
727 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
728 cli->cl_next_shrink_grant);
731 static void __osc_update_grant(struct client_obd *cli, u64 grant)
733 spin_lock(&cli->cl_loi_list_lock);
734 cli->cl_avail_grant += grant;
735 spin_unlock(&cli->cl_loi_list_lock);
738 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
740 if (body->oa.o_valid & OBD_MD_FLGRANT) {
741 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
742 __osc_update_grant(cli, body->oa.o_grant);
746 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
747 u32 keylen, void *key,
748 u32 vallen, void *val,
749 struct ptlrpc_request_set *set);
751 static int osc_shrink_grant_interpret(const struct lu_env *env,
752 struct ptlrpc_request *req,
755 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
756 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
757 struct ost_body *body;
760 __osc_update_grant(cli, oa->o_grant);
764 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
766 osc_update_grant(cli, body);
772 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
774 spin_lock(&cli->cl_loi_list_lock);
775 oa->o_grant = cli->cl_avail_grant / 4;
776 cli->cl_avail_grant -= oa->o_grant;
777 spin_unlock(&cli->cl_loi_list_lock);
778 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
779 oa->o_valid |= OBD_MD_FLFLAGS;
782 oa->o_flags |= OBD_FL_SHRINK_GRANT;
783 osc_update_next_shrink(cli);
786 /* Shrink the current grant, either from some large amount to enough for a
787 * full set of in-flight RPCs, or if we have already shrunk to that limit
788 * then to enough for a single RPC. This avoids keeping more grant than
789 * needed, and avoids shrinking the grant piecemeal. */
790 static int osc_shrink_grant(struct client_obd *cli)
792 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
793 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
795 spin_lock(&cli->cl_loi_list_lock);
796 if (cli->cl_avail_grant <= target_bytes)
797 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
798 spin_unlock(&cli->cl_loi_list_lock);
800 return osc_shrink_grant_to_target(cli, target_bytes);
803 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
806 struct ost_body *body;
809 spin_lock(&cli->cl_loi_list_lock);
810 /* Don't shrink if we are already above or below the desired limit
811 * We don't want to shrink below a single RPC, as that will negatively
812 * impact block allocation and long-term performance. */
813 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
814 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
816 if (target_bytes >= cli->cl_avail_grant) {
817 spin_unlock(&cli->cl_loi_list_lock);
820 spin_unlock(&cli->cl_loi_list_lock);
826 osc_announce_cached(cli, &body->oa, 0);
828 spin_lock(&cli->cl_loi_list_lock);
829 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
830 cli->cl_avail_grant = target_bytes;
831 spin_unlock(&cli->cl_loi_list_lock);
832 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
833 body->oa.o_valid |= OBD_MD_FLFLAGS;
834 body->oa.o_flags = 0;
836 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
837 osc_update_next_shrink(cli);
839 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
840 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
841 sizeof(*body), body, NULL);
843 __osc_update_grant(cli, body->oa.o_grant);
848 static int osc_should_shrink_grant(struct client_obd *client)
850 time64_t next_shrink = client->cl_next_shrink_grant;
852 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
853 OBD_CONNECT_GRANT_SHRINK) == 0)
856 if (ktime_get_seconds() >= next_shrink - 5) {
857 /* Get the current RPC size directly, instead of going via:
858 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
859 * Keep comment here so that it can be found by searching. */
860 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
862 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
863 client->cl_avail_grant > brw_size)
866 osc_update_next_shrink(client);
871 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
873 struct client_obd *client;
875 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
876 if (osc_should_shrink_grant(client))
877 osc_shrink_grant(client);
882 static int osc_add_shrink_grant(struct client_obd *client)
886 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
888 osc_grant_shrink_grant_cb, NULL,
889 &client->cl_grant_shrink_list);
891 CERROR("add grant client %s error %d\n", cli_name(client), rc);
894 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
895 osc_update_next_shrink(client);
899 static int osc_del_shrink_grant(struct client_obd *client)
901 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
908 * ocd_grant is the total grant amount we're expect to hold: if we've
909 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
910 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
913 * race is tolerable here: if we're evicted, but imp_state already
914 * left EVICTED state, then cl_dirty_pages must be 0 already.
916 spin_lock(&cli->cl_loi_list_lock);
917 cli->cl_avail_grant = ocd->ocd_grant;
918 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
919 cli->cl_avail_grant -= cli->cl_reserved_grant;
920 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
921 cli->cl_avail_grant -= cli->cl_dirty_grant;
923 cli->cl_avail_grant -=
924 cli->cl_dirty_pages << PAGE_SHIFT;
927 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
931 /* overhead for each extent insertion */
932 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
933 /* determine the appropriate chunk size used by osc_extent. */
934 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
935 ocd->ocd_grant_blkbits);
936 /* max_pages_per_rpc must be chunk aligned */
937 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
938 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
939 ~chunk_mask) & chunk_mask;
940 /* determine maximum extent size, in #pages */
941 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
942 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
943 if (cli->cl_max_extent_pages == 0)
944 cli->cl_max_extent_pages = 1;
946 cli->cl_grant_extent_tax = 0;
947 cli->cl_chunkbits = PAGE_SHIFT;
948 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
950 spin_unlock(&cli->cl_loi_list_lock);
952 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
953 "chunk bits: %d cl_max_extent_pages: %d\n",
955 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
956 cli->cl_max_extent_pages);
958 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
959 list_empty(&cli->cl_grant_shrink_list))
960 osc_add_shrink_grant(cli);
963 /* We assume that the reason this OSC got a short read is because it read
964 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
965 * via the LOV, and it _knows_ it's reading inside the file, it's just that
966 * this stripe never got written at or beyond this stripe offset yet. */
967 static void handle_short_read(int nob_read, size_t page_count,
968 struct brw_page **pga)
973 /* skip bytes read OK */
974 while (nob_read > 0) {
975 LASSERT (page_count > 0);
977 if (pga[i]->count > nob_read) {
978 /* EOF inside this page */
979 ptr = kmap(pga[i]->pg) +
980 (pga[i]->off & ~PAGE_MASK);
981 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
988 nob_read -= pga[i]->count;
993 /* zero remaining pages */
994 while (page_count-- > 0) {
995 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
996 memset(ptr, 0, pga[i]->count);
1002 static int check_write_rcs(struct ptlrpc_request *req,
1003 int requested_nob, int niocount,
1004 size_t page_count, struct brw_page **pga)
1009 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1010 sizeof(*remote_rcs) *
1012 if (remote_rcs == NULL) {
1013 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1017 /* return error if any niobuf was in error */
1018 for (i = 0; i < niocount; i++) {
1019 if ((int)remote_rcs[i] < 0)
1020 return(remote_rcs[i]);
1022 if (remote_rcs[i] != 0) {
1023 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1024 i, remote_rcs[i], req);
1029 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1030 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1031 req->rq_bulk->bd_nob_transferred, requested_nob);
1038 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1040 if (p1->flag != p2->flag) {
1041 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1042 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1043 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1045 /* warn if we try to combine flags that we don't know to be
1046 * safe to combine */
1047 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1048 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1049 "report this at https://jira.hpdd.intel.com/\n",
1050 p1->flag, p2->flag);
1055 return (p1->off + p1->count == p2->off);
1058 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1059 struct brw_page **pga, int opc,
1060 enum cksum_types cksum_type)
1064 struct cfs_crypto_hash_desc *hdesc;
1065 unsigned int bufsize;
1066 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1068 LASSERT(pg_count > 0);
1070 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1071 if (IS_ERR(hdesc)) {
1072 CERROR("Unable to initialize checksum hash %s\n",
1073 cfs_crypto_hash_name(cfs_alg));
1074 return PTR_ERR(hdesc);
1077 while (nob > 0 && pg_count > 0) {
1078 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1080 /* corrupt the data before we compute the checksum, to
1081 * simulate an OST->client data error */
1082 if (i == 0 && opc == OST_READ &&
1083 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1084 unsigned char *ptr = kmap(pga[i]->pg);
1085 int off = pga[i]->off & ~PAGE_MASK;
1087 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1090 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1091 pga[i]->off & ~PAGE_MASK,
1093 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1094 (int)(pga[i]->off & ~PAGE_MASK));
1096 nob -= pga[i]->count;
1101 bufsize = sizeof(cksum);
1102 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1104 /* For sending we only compute the wrong checksum instead
1105 * of corrupting the data so it is still correct on a redo */
1106 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1113 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1114 u32 page_count, struct brw_page **pga,
1115 struct ptlrpc_request **reqp, int resend)
1117 struct ptlrpc_request *req;
1118 struct ptlrpc_bulk_desc *desc;
1119 struct ost_body *body;
1120 struct obd_ioobj *ioobj;
1121 struct niobuf_remote *niobuf;
1122 int niocount, i, requested_nob, opc, rc;
1123 struct osc_brw_async_args *aa;
1124 struct req_capsule *pill;
1125 struct brw_page *pg_prev;
1128 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1129 RETURN(-ENOMEM); /* Recoverable */
1130 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1131 RETURN(-EINVAL); /* Fatal */
1133 if ((cmd & OBD_BRW_WRITE) != 0) {
1135 req = ptlrpc_request_alloc_pool(cli->cl_import,
1137 &RQF_OST_BRW_WRITE);
1140 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1145 for (niocount = i = 1; i < page_count; i++) {
1146 if (!can_merge_pages(pga[i - 1], pga[i]))
1150 pill = &req->rq_pill;
1151 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1153 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1154 niocount * sizeof(*niobuf));
1156 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1158 ptlrpc_request_free(req);
1161 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1162 ptlrpc_at_set_req_timeout(req);
1163 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1165 req->rq_no_retry_einprogress = 1;
1167 desc = ptlrpc_prep_bulk_imp(req, page_count,
1168 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1169 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1170 PTLRPC_BULK_PUT_SINK) |
1171 PTLRPC_BULK_BUF_KIOV,
1173 &ptlrpc_bulk_kiov_pin_ops);
1176 GOTO(out, rc = -ENOMEM);
1177 /* NB request now owns desc and will free it when it gets freed */
1179 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1180 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1181 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1182 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1184 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1186 obdo_to_ioobj(oa, ioobj);
1187 ioobj->ioo_bufcnt = niocount;
1188 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1189 * that might be send for this request. The actual number is decided
1190 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1191 * "max - 1" for old client compatibility sending "0", and also so the
1192 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1193 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1194 LASSERT(page_count > 0);
1196 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1197 struct brw_page *pg = pga[i];
1198 int poff = pg->off & ~PAGE_MASK;
1200 LASSERT(pg->count > 0);
1201 /* make sure there is no gap in the middle of page array */
1202 LASSERTF(page_count == 1 ||
1203 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1204 ergo(i > 0 && i < page_count - 1,
1205 poff == 0 && pg->count == PAGE_SIZE) &&
1206 ergo(i == page_count - 1, poff == 0)),
1207 "i: %d/%d pg: %p off: %llu, count: %u\n",
1208 i, page_count, pg, pg->off, pg->count);
1209 LASSERTF(i == 0 || pg->off > pg_prev->off,
1210 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1211 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1213 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1214 pg_prev->pg, page_private(pg_prev->pg),
1215 pg_prev->pg->index, pg_prev->off);
1216 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1217 (pg->flag & OBD_BRW_SRVLOCK));
1219 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1220 requested_nob += pg->count;
1222 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1224 niobuf->rnb_len += pg->count;
1226 niobuf->rnb_offset = pg->off;
1227 niobuf->rnb_len = pg->count;
1228 niobuf->rnb_flags = pg->flag;
1233 LASSERTF((void *)(niobuf - niocount) ==
1234 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1235 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1236 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1238 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1240 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1241 body->oa.o_valid |= OBD_MD_FLFLAGS;
1242 body->oa.o_flags = 0;
1244 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1247 if (osc_should_shrink_grant(cli))
1248 osc_shrink_grant_local(cli, &body->oa);
1250 /* size[REQ_REC_OFF] still sizeof (*body) */
1251 if (opc == OST_WRITE) {
1252 if (cli->cl_checksum &&
1253 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1254 /* store cl_cksum_type in a local variable since
1255 * it can be changed via lprocfs */
1256 enum cksum_types cksum_type = cli->cl_cksum_type;
1258 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1259 body->oa.o_flags = 0;
1261 body->oa.o_flags |= cksum_type_pack(cksum_type);
1262 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1263 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1267 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1269 /* save this in 'oa', too, for later checking */
1270 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271 oa->o_flags |= cksum_type_pack(cksum_type);
1273 /* clear out the checksum flag, in case this is a
1274 * resend but cl_checksum is no longer set. b=11238 */
1275 oa->o_valid &= ~OBD_MD_FLCKSUM;
1277 oa->o_cksum = body->oa.o_cksum;
1278 /* 1 RC per niobuf */
1279 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1280 sizeof(__u32) * niocount);
1282 if (cli->cl_checksum &&
1283 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1284 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1285 body->oa.o_flags = 0;
1286 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1287 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1290 /* Client cksum has been already copied to wire obdo in previous
1291 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1292 * resent due to cksum error, this will allow Server to
1293 * check+dump pages on its side */
1295 ptlrpc_request_set_replen(req);
1297 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298 aa = ptlrpc_req_async_args(req);
1300 aa->aa_requested_nob = requested_nob;
1301 aa->aa_nio_count = niocount;
1302 aa->aa_page_count = page_count;
1306 INIT_LIST_HEAD(&aa->aa_oaps);
1309 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1310 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1311 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1312 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1316 ptlrpc_req_finished(req);
1320 char dbgcksum_file_name[PATH_MAX];
1322 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1323 struct brw_page **pga, __u32 server_cksum,
1332 /* will only keep dump of pages on first error for the same range in
1333 * file/fid, not during the resends/retries. */
1334 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1335 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1336 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1337 libcfs_debug_file_path_arr :
1338 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1339 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1340 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1341 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1343 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1344 client_cksum, server_cksum);
1345 filp = filp_open(dbgcksum_file_name,
1346 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1350 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1351 "checksum error: rc = %d\n", dbgcksum_file_name,
1354 CERROR("%s: can't open to dump pages with checksum "
1355 "error: rc = %d\n", dbgcksum_file_name, rc);
1361 for (i = 0; i < page_count; i++) {
1362 len = pga[i]->count;
1363 buf = kmap(pga[i]->pg);
1365 rc = vfs_write(filp, (__force const char __user *)buf,
1368 CERROR("%s: wanted to write %u but got %d "
1369 "error\n", dbgcksum_file_name, len, rc);
1374 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1375 dbgcksum_file_name, rc);
1381 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1383 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1384 filp_close(filp, NULL);
1389 check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1390 __u32 client_cksum, __u32 server_cksum,
1391 struct osc_brw_async_args *aa)
1395 enum cksum_types cksum_type;
1397 if (server_cksum == client_cksum) {
1398 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1402 if (aa->aa_cli->cl_checksum_dump)
1403 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1404 server_cksum, client_cksum);
1406 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1408 new_cksum = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1409 aa->aa_ppga, OST_WRITE, cksum_type);
1411 if (cksum_type != cksum_type_unpack(aa->aa_oa->o_flags))
1412 msg = "the server did not use the checksum type specified in "
1413 "the original request - likely a protocol problem";
1414 else if (new_cksum == server_cksum)
1415 msg = "changed on the client after we checksummed it - "
1416 "likely false positive due to mmap IO (bug 11742)";
1417 else if (new_cksum == client_cksum)
1418 msg = "changed in transit before arrival at OST";
1420 msg = "changed in transit AND doesn't match the original - "
1421 "likely false positive due to mmap IO (bug 11742)";
1423 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1424 DFID " object "DOSTID" extent [%llu-%llu], original "
1425 "client csum %x (type %x), server csum %x (type %x),"
1426 " client csum now %x\n",
1427 aa->aa_cli->cl_import->imp_obd->obd_name,
1428 msg, libcfs_nid2str(peer->nid),
1429 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1430 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1431 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1432 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1433 aa->aa_ppga[aa->aa_page_count - 1]->off +
1434 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1435 client_cksum, cksum_type_unpack(aa->aa_oa->o_flags),
1436 server_cksum, cksum_type, new_cksum);
1440 /* Note rc enters this function as number of bytes transferred */
1441 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1443 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1444 const struct lnet_process_id *peer =
1445 &req->rq_import->imp_connection->c_peer;
1446 struct client_obd *cli = aa->aa_cli;
1447 struct ost_body *body;
1448 u32 client_cksum = 0;
1451 if (rc < 0 && rc != -EDQUOT) {
1452 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1456 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1457 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1459 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1463 /* set/clear over quota flag for a uid/gid/projid */
1464 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1465 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1466 unsigned qid[LL_MAXQUOTAS] = {
1467 body->oa.o_uid, body->oa.o_gid,
1468 body->oa.o_projid };
1469 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1470 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1471 body->oa.o_valid, body->oa.o_flags);
1472 osc_quota_setdq(cli, qid, body->oa.o_valid,
1476 osc_update_grant(cli, body);
1481 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1482 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1484 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1486 CERROR("Unexpected +ve rc %d\n", rc);
1489 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1491 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1494 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1495 check_write_checksum(&body->oa, peer, client_cksum,
1496 body->oa.o_cksum, aa))
1499 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1500 aa->aa_page_count, aa->aa_ppga);
1504 /* The rest of this function executes only for OST_READs */
1506 /* if unwrap_bulk failed, return -EAGAIN to retry */
1507 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1509 GOTO(out, rc = -EAGAIN);
1511 if (rc > aa->aa_requested_nob) {
1512 CERROR("Unexpected rc %d (%d requested)\n", rc,
1513 aa->aa_requested_nob);
1517 if (rc != req->rq_bulk->bd_nob_transferred) {
1518 CERROR ("Unexpected rc %d (%d transferred)\n",
1519 rc, req->rq_bulk->bd_nob_transferred);
1523 if (rc < aa->aa_requested_nob)
1524 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1526 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1527 static int cksum_counter;
1528 u32 server_cksum = body->oa.o_cksum;
1531 enum cksum_types cksum_type;
1533 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1534 body->oa.o_flags : 0);
1535 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1536 aa->aa_ppga, OST_READ,
1539 if (peer->nid != req->rq_bulk->bd_sender) {
1541 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1544 if (server_cksum != client_cksum) {
1545 struct ost_body *clbody;
1546 u32 page_count = aa->aa_page_count;
1548 clbody = req_capsule_client_get(&req->rq_pill,
1550 if (cli->cl_checksum_dump)
1551 dump_all_bulk_pages(&clbody->oa, page_count,
1552 aa->aa_ppga, server_cksum,
1555 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1556 "%s%s%s inode "DFID" object "DOSTID
1557 " extent [%llu-%llu], client %x, "
1558 "server %x, cksum_type %x\n",
1559 req->rq_import->imp_obd->obd_name,
1560 libcfs_nid2str(peer->nid),
1562 clbody->oa.o_valid & OBD_MD_FLFID ?
1563 clbody->oa.o_parent_seq : 0ULL,
1564 clbody->oa.o_valid & OBD_MD_FLFID ?
1565 clbody->oa.o_parent_oid : 0,
1566 clbody->oa.o_valid & OBD_MD_FLFID ?
1567 clbody->oa.o_parent_ver : 0,
1568 POSTID(&body->oa.o_oi),
1569 aa->aa_ppga[0]->off,
1570 aa->aa_ppga[page_count-1]->off +
1571 aa->aa_ppga[page_count-1]->count - 1,
1572 client_cksum, server_cksum,
1575 aa->aa_oa->o_cksum = client_cksum;
1579 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1582 } else if (unlikely(client_cksum)) {
1583 static int cksum_missed;
1586 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1587 CERROR("Checksum %u requested from %s but not sent\n",
1588 cksum_missed, libcfs_nid2str(peer->nid));
1594 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1595 aa->aa_oa, &body->oa);
1600 static int osc_brw_redo_request(struct ptlrpc_request *request,
1601 struct osc_brw_async_args *aa, int rc)
1603 struct ptlrpc_request *new_req;
1604 struct osc_brw_async_args *new_aa;
1605 struct osc_async_page *oap;
1608 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1609 "redo for recoverable error %d", rc);
1611 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1612 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1613 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1614 aa->aa_ppga, &new_req, 1);
1618 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1619 if (oap->oap_request != NULL) {
1620 LASSERTF(request == oap->oap_request,
1621 "request %p != oap_request %p\n",
1622 request, oap->oap_request);
1623 if (oap->oap_interrupted) {
1624 ptlrpc_req_finished(new_req);
1629 /* New request takes over pga and oaps from old request.
1630 * Note that copying a list_head doesn't work, need to move it... */
1632 new_req->rq_interpret_reply = request->rq_interpret_reply;
1633 new_req->rq_async_args = request->rq_async_args;
1634 new_req->rq_commit_cb = request->rq_commit_cb;
1635 /* cap resend delay to the current request timeout, this is similar to
1636 * what ptlrpc does (see after_reply()) */
1637 if (aa->aa_resends > new_req->rq_timeout)
1638 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1640 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1641 new_req->rq_generation_set = 1;
1642 new_req->rq_import_generation = request->rq_import_generation;
1644 new_aa = ptlrpc_req_async_args(new_req);
1646 INIT_LIST_HEAD(&new_aa->aa_oaps);
1647 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1648 INIT_LIST_HEAD(&new_aa->aa_exts);
1649 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1650 new_aa->aa_resends = aa->aa_resends;
1652 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1653 if (oap->oap_request) {
1654 ptlrpc_req_finished(oap->oap_request);
1655 oap->oap_request = ptlrpc_request_addref(new_req);
1659 /* XXX: This code will run into problem if we're going to support
1660 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1661 * and wait for all of them to be finished. We should inherit request
1662 * set from old request. */
1663 ptlrpcd_add_req(new_req);
1665 DEBUG_REQ(D_INFO, new_req, "new request");
1670 * ugh, we want disk allocation on the target to happen in offset order. we'll
1671 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1672 * fine for our small page arrays and doesn't require allocation. its an
1673 * insertion sort that swaps elements that are strides apart, shrinking the
1674 * stride down until its '1' and the array is sorted.
1676 static void sort_brw_pages(struct brw_page **array, int num)
1679 struct brw_page *tmp;
1683 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1688 for (i = stride ; i < num ; i++) {
1691 while (j >= stride && array[j - stride]->off > tmp->off) {
1692 array[j] = array[j - stride];
1697 } while (stride > 1);
1700 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1702 LASSERT(ppga != NULL);
1703 OBD_FREE(ppga, sizeof(*ppga) * count);
1706 static int brw_interpret(const struct lu_env *env,
1707 struct ptlrpc_request *req, void *data, int rc)
1709 struct osc_brw_async_args *aa = data;
1710 struct osc_extent *ext;
1711 struct osc_extent *tmp;
1712 struct client_obd *cli = aa->aa_cli;
1715 rc = osc_brw_fini_request(req, rc);
1716 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1717 /* When server return -EINPROGRESS, client should always retry
1718 * regardless of the number of times the bulk was resent already. */
1719 if (osc_recoverable_error(rc)) {
1720 if (req->rq_import_generation !=
1721 req->rq_import->imp_generation) {
1722 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1723 ""DOSTID", rc = %d.\n",
1724 req->rq_import->imp_obd->obd_name,
1725 POSTID(&aa->aa_oa->o_oi), rc);
1726 } else if (rc == -EINPROGRESS ||
1727 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1728 rc = osc_brw_redo_request(req, aa, rc);
1730 CERROR("%s: too many resent retries for object: "
1731 "%llu:%llu, rc = %d.\n",
1732 req->rq_import->imp_obd->obd_name,
1733 POSTID(&aa->aa_oa->o_oi), rc);
1738 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1743 struct obdo *oa = aa->aa_oa;
1744 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1745 unsigned long valid = 0;
1746 struct cl_object *obj;
1747 struct osc_async_page *last;
1749 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1750 obj = osc2cl(last->oap_obj);
1752 cl_object_attr_lock(obj);
1753 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1754 attr->cat_blocks = oa->o_blocks;
1755 valid |= CAT_BLOCKS;
1757 if (oa->o_valid & OBD_MD_FLMTIME) {
1758 attr->cat_mtime = oa->o_mtime;
1761 if (oa->o_valid & OBD_MD_FLATIME) {
1762 attr->cat_atime = oa->o_atime;
1765 if (oa->o_valid & OBD_MD_FLCTIME) {
1766 attr->cat_ctime = oa->o_ctime;
1770 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1771 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1772 loff_t last_off = last->oap_count + last->oap_obj_off +
1775 /* Change file size if this is an out of quota or
1776 * direct IO write and it extends the file size */
1777 if (loi->loi_lvb.lvb_size < last_off) {
1778 attr->cat_size = last_off;
1781 /* Extend KMS if it's not a lockless write */
1782 if (loi->loi_kms < last_off &&
1783 oap2osc_page(last)->ops_srvlock == 0) {
1784 attr->cat_kms = last_off;
1790 cl_object_attr_update(env, obj, attr, valid);
1791 cl_object_attr_unlock(obj);
1793 OBDO_FREE(aa->aa_oa);
1795 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1796 osc_inc_unstable_pages(req);
1798 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1799 list_del_init(&ext->oe_link);
1800 osc_extent_finish(env, ext, 1, rc);
1802 LASSERT(list_empty(&aa->aa_exts));
1803 LASSERT(list_empty(&aa->aa_oaps));
1805 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1806 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1808 spin_lock(&cli->cl_loi_list_lock);
1809 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1810 * is called so we know whether to go to sync BRWs or wait for more
1811 * RPCs to complete */
1812 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1813 cli->cl_w_in_flight--;
1815 cli->cl_r_in_flight--;
1816 osc_wake_cache_waiters(cli);
1817 spin_unlock(&cli->cl_loi_list_lock);
1819 osc_io_unplug(env, cli, NULL);
1823 static void brw_commit(struct ptlrpc_request *req)
1825 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1826 * this called via the rq_commit_cb, I need to ensure
1827 * osc_dec_unstable_pages is still called. Otherwise unstable
1828 * pages may be leaked. */
1829 spin_lock(&req->rq_lock);
1830 if (likely(req->rq_unstable)) {
1831 req->rq_unstable = 0;
1832 spin_unlock(&req->rq_lock);
1834 osc_dec_unstable_pages(req);
1836 req->rq_committed = 1;
1837 spin_unlock(&req->rq_lock);
1842 * Build an RPC by the list of extent @ext_list. The caller must ensure
1843 * that the total pages in this list are NOT over max pages per RPC.
1844 * Extents in the list must be in OES_RPC state.
1846 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1847 struct list_head *ext_list, int cmd)
1849 struct ptlrpc_request *req = NULL;
1850 struct osc_extent *ext;
1851 struct brw_page **pga = NULL;
1852 struct osc_brw_async_args *aa = NULL;
1853 struct obdo *oa = NULL;
1854 struct osc_async_page *oap;
1855 struct osc_object *obj = NULL;
1856 struct cl_req_attr *crattr = NULL;
1857 loff_t starting_offset = OBD_OBJECT_EOF;
1858 loff_t ending_offset = 0;
1862 bool soft_sync = false;
1863 bool interrupted = false;
1867 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1868 struct ost_body *body;
1870 LASSERT(!list_empty(ext_list));
1872 /* add pages into rpc_list to build BRW rpc */
1873 list_for_each_entry(ext, ext_list, oe_link) {
1874 LASSERT(ext->oe_state == OES_RPC);
1875 mem_tight |= ext->oe_memalloc;
1876 grant += ext->oe_grants;
1877 page_count += ext->oe_nr_pages;
1882 soft_sync = osc_over_unstable_soft_limit(cli);
1884 mpflag = cfs_memory_pressure_get_and_set();
1886 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1888 GOTO(out, rc = -ENOMEM);
1892 GOTO(out, rc = -ENOMEM);
1895 list_for_each_entry(ext, ext_list, oe_link) {
1896 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1898 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1900 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1901 pga[i] = &oap->oap_brw_page;
1902 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1905 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1906 if (starting_offset == OBD_OBJECT_EOF ||
1907 starting_offset > oap->oap_obj_off)
1908 starting_offset = oap->oap_obj_off;
1910 LASSERT(oap->oap_page_off == 0);
1911 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1912 ending_offset = oap->oap_obj_off +
1915 LASSERT(oap->oap_page_off + oap->oap_count ==
1917 if (oap->oap_interrupted)
1922 /* first page in the list */
1923 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1925 crattr = &osc_env_info(env)->oti_req_attr;
1926 memset(crattr, 0, sizeof(*crattr));
1927 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1928 crattr->cra_flags = ~0ULL;
1929 crattr->cra_page = oap2cl_page(oap);
1930 crattr->cra_oa = oa;
1931 cl_req_attr_set(env, osc2cl(obj), crattr);
1933 if (cmd == OBD_BRW_WRITE)
1934 oa->o_grant_used = grant;
1936 sort_brw_pages(pga, page_count);
1937 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1939 CERROR("prep_req failed: %d\n", rc);
1943 req->rq_commit_cb = brw_commit;
1944 req->rq_interpret_reply = brw_interpret;
1945 req->rq_memalloc = mem_tight != 0;
1946 oap->oap_request = ptlrpc_request_addref(req);
1947 if (interrupted && !req->rq_intr)
1948 ptlrpc_mark_interrupted(req);
1950 /* Need to update the timestamps after the request is built in case
1951 * we race with setattr (locally or in queue at OST). If OST gets
1952 * later setattr before earlier BRW (as determined by the request xid),
1953 * the OST will not use BRW timestamps. Sadly, there is no obvious
1954 * way to do this in a single call. bug 10150 */
1955 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1956 crattr->cra_oa = &body->oa;
1957 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1958 cl_req_attr_set(env, osc2cl(obj), crattr);
1959 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1961 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1962 aa = ptlrpc_req_async_args(req);
1963 INIT_LIST_HEAD(&aa->aa_oaps);
1964 list_splice_init(&rpc_list, &aa->aa_oaps);
1965 INIT_LIST_HEAD(&aa->aa_exts);
1966 list_splice_init(ext_list, &aa->aa_exts);
1968 spin_lock(&cli->cl_loi_list_lock);
1969 starting_offset >>= PAGE_SHIFT;
1970 if (cmd == OBD_BRW_READ) {
1971 cli->cl_r_in_flight++;
1972 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1973 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1974 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1975 starting_offset + 1);
1977 cli->cl_w_in_flight++;
1978 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1979 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1980 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1981 starting_offset + 1);
1983 spin_unlock(&cli->cl_loi_list_lock);
1985 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1986 page_count, aa, cli->cl_r_in_flight,
1987 cli->cl_w_in_flight);
1988 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1990 ptlrpcd_add_req(req);
1996 cfs_memory_pressure_restore(mpflag);
1999 LASSERT(req == NULL);
2004 OBD_FREE(pga, sizeof(*pga) * page_count);
2005 /* this should happen rarely and is pretty bad, it makes the
2006 * pending list not follow the dirty order */
2007 while (!list_empty(ext_list)) {
2008 ext = list_entry(ext_list->next, struct osc_extent,
2010 list_del_init(&ext->oe_link);
2011 osc_extent_finish(env, ext, 0, rc);
2017 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2021 LASSERT(lock != NULL);
2023 lock_res_and_lock(lock);
2025 if (lock->l_ast_data == NULL)
2026 lock->l_ast_data = data;
2027 if (lock->l_ast_data == data)
2030 unlock_res_and_lock(lock);
2035 static int osc_enqueue_fini(struct ptlrpc_request *req,
2036 osc_enqueue_upcall_f upcall, void *cookie,
2037 struct lustre_handle *lockh, enum ldlm_mode mode,
2038 __u64 *flags, bool speculative, int errcode)
2040 bool intent = *flags & LDLM_FL_HAS_INTENT;
2044 /* The request was created before ldlm_cli_enqueue call. */
2045 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2046 struct ldlm_reply *rep;
2048 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2049 LASSERT(rep != NULL);
2051 rep->lock_policy_res1 =
2052 ptlrpc_status_ntoh(rep->lock_policy_res1);
2053 if (rep->lock_policy_res1)
2054 errcode = rep->lock_policy_res1;
2056 *flags |= LDLM_FL_LVB_READY;
2057 } else if (errcode == ELDLM_OK) {
2058 *flags |= LDLM_FL_LVB_READY;
2061 /* Call the update callback. */
2062 rc = (*upcall)(cookie, lockh, errcode);
2064 /* release the reference taken in ldlm_cli_enqueue() */
2065 if (errcode == ELDLM_LOCK_MATCHED)
2067 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2068 ldlm_lock_decref(lockh, mode);
2073 static int osc_enqueue_interpret(const struct lu_env *env,
2074 struct ptlrpc_request *req,
2075 struct osc_enqueue_args *aa, int rc)
2077 struct ldlm_lock *lock;
2078 struct lustre_handle *lockh = &aa->oa_lockh;
2079 enum ldlm_mode mode = aa->oa_mode;
2080 struct ost_lvb *lvb = aa->oa_lvb;
2081 __u32 lvb_len = sizeof(*lvb);
2086 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2088 lock = ldlm_handle2lock(lockh);
2089 LASSERTF(lock != NULL,
2090 "lockh %#llx, req %p, aa %p - client evicted?\n",
2091 lockh->cookie, req, aa);
2093 /* Take an additional reference so that a blocking AST that
2094 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2095 * to arrive after an upcall has been executed by
2096 * osc_enqueue_fini(). */
2097 ldlm_lock_addref(lockh, mode);
2099 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2100 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2102 /* Let CP AST to grant the lock first. */
2103 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2105 if (aa->oa_speculative) {
2106 LASSERT(aa->oa_lvb == NULL);
2107 LASSERT(aa->oa_flags == NULL);
2108 aa->oa_flags = &flags;
2111 /* Complete obtaining the lock procedure. */
2112 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2113 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2115 /* Complete osc stuff. */
2116 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2117 aa->oa_flags, aa->oa_speculative, rc);
2119 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2121 ldlm_lock_decref(lockh, mode);
2122 LDLM_LOCK_PUT(lock);
2126 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2128 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2129 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2130 * other synchronous requests, however keeping some locks and trying to obtain
2131 * others may take a considerable amount of time in a case of ost failure; and
2132 * when other sync requests do not get released lock from a client, the client
2133 * is evicted from the cluster -- such scenarious make the life difficult, so
2134 * release locks just after they are obtained. */
2135 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2136 __u64 *flags, union ldlm_policy_data *policy,
2137 struct ost_lvb *lvb, int kms_valid,
2138 osc_enqueue_upcall_f upcall, void *cookie,
2139 struct ldlm_enqueue_info *einfo,
2140 struct ptlrpc_request_set *rqset, int async,
2143 struct obd_device *obd = exp->exp_obd;
2144 struct lustre_handle lockh = { 0 };
2145 struct ptlrpc_request *req = NULL;
2146 int intent = *flags & LDLM_FL_HAS_INTENT;
2147 __u64 match_flags = *flags;
2148 enum ldlm_mode mode;
2152 /* Filesystem lock extents are extended to page boundaries so that
2153 * dealing with the page cache is a little smoother. */
2154 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2155 policy->l_extent.end |= ~PAGE_MASK;
2158 * kms is not valid when either object is completely fresh (so that no
2159 * locks are cached), or object was evicted. In the latter case cached
2160 * lock cannot be used, because it would prime inode state with
2161 * potentially stale LVB.
2166 /* Next, search for already existing extent locks that will cover us */
2167 /* If we're trying to read, we also search for an existing PW lock. The
2168 * VFS and page cache already protect us locally, so lots of readers/
2169 * writers can share a single PW lock.
2171 * There are problems with conversion deadlocks, so instead of
2172 * converting a read lock to a write lock, we'll just enqueue a new
2175 * At some point we should cancel the read lock instead of making them
2176 * send us a blocking callback, but there are problems with canceling
2177 * locks out from other users right now, too. */
2178 mode = einfo->ei_mode;
2179 if (einfo->ei_mode == LCK_PR)
2181 /* Normal lock requests must wait for the LVB to be ready before
2182 * matching a lock; speculative lock requests do not need to,
2183 * because they will not actually use the lock. */
2185 match_flags |= LDLM_FL_LVB_READY;
2187 match_flags |= LDLM_FL_BLOCK_GRANTED;
2188 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2189 einfo->ei_type, policy, mode, &lockh, 0);
2191 struct ldlm_lock *matched;
2193 if (*flags & LDLM_FL_TEST_LOCK)
2196 matched = ldlm_handle2lock(&lockh);
2198 /* This DLM lock request is speculative, and does not
2199 * have an associated IO request. Therefore if there
2200 * is already a DLM lock, it wll just inform the
2201 * caller to cancel the request for this stripe.*/
2202 lock_res_and_lock(matched);
2203 if (ldlm_extent_equal(&policy->l_extent,
2204 &matched->l_policy_data.l_extent))
2208 unlock_res_and_lock(matched);
2210 ldlm_lock_decref(&lockh, mode);
2211 LDLM_LOCK_PUT(matched);
2213 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2214 *flags |= LDLM_FL_LVB_READY;
2216 /* We already have a lock, and it's referenced. */
2217 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2219 ldlm_lock_decref(&lockh, mode);
2220 LDLM_LOCK_PUT(matched);
2223 ldlm_lock_decref(&lockh, mode);
2224 LDLM_LOCK_PUT(matched);
2229 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2233 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2234 &RQF_LDLM_ENQUEUE_LVB);
2238 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2240 ptlrpc_request_free(req);
2244 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2246 ptlrpc_request_set_replen(req);
2249 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2250 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2252 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2253 sizeof(*lvb), LVB_T_OST, &lockh, async);
2256 struct osc_enqueue_args *aa;
2257 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2258 aa = ptlrpc_req_async_args(req);
2260 aa->oa_mode = einfo->ei_mode;
2261 aa->oa_type = einfo->ei_type;
2262 lustre_handle_copy(&aa->oa_lockh, &lockh);
2263 aa->oa_upcall = upcall;
2264 aa->oa_cookie = cookie;
2265 aa->oa_speculative = speculative;
2267 aa->oa_flags = flags;
2270 /* speculative locks are essentially to enqueue
2271 * a DLM lock in advance, so we don't care
2272 * about the result of the enqueue. */
2274 aa->oa_flags = NULL;
2277 req->rq_interpret_reply =
2278 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2279 if (rqset == PTLRPCD_SET)
2280 ptlrpcd_add_req(req);
2282 ptlrpc_set_add_req(rqset, req);
2283 } else if (intent) {
2284 ptlrpc_req_finished(req);
2289 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2290 flags, speculative, rc);
2292 ptlrpc_req_finished(req);
2297 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2298 enum ldlm_type type, union ldlm_policy_data *policy,
2299 enum ldlm_mode mode, __u64 *flags, void *data,
2300 struct lustre_handle *lockh, int unref)
2302 struct obd_device *obd = exp->exp_obd;
2303 __u64 lflags = *flags;
2307 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2310 /* Filesystem lock extents are extended to page boundaries so that
2311 * dealing with the page cache is a little smoother */
2312 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2313 policy->l_extent.end |= ~PAGE_MASK;
2315 /* Next, search for already existing extent locks that will cover us */
2316 /* If we're trying to read, we also search for an existing PW lock. The
2317 * VFS and page cache already protect us locally, so lots of readers/
2318 * writers can share a single PW lock. */
2322 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2323 res_id, type, policy, rc, lockh, unref);
2324 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2328 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2330 LASSERT(lock != NULL);
2331 if (!osc_set_lock_data(lock, data)) {
2332 ldlm_lock_decref(lockh, rc);
2335 LDLM_LOCK_PUT(lock);
2340 static int osc_statfs_interpret(const struct lu_env *env,
2341 struct ptlrpc_request *req,
2342 struct osc_async_args *aa, int rc)
2344 struct obd_statfs *msfs;
2348 /* The request has in fact never been sent
2349 * due to issues at a higher level (LOV).
2350 * Exit immediately since the caller is
2351 * aware of the problem and takes care
2352 * of the clean up */
2355 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2356 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2362 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2364 GOTO(out, rc = -EPROTO);
2367 *aa->aa_oi->oi_osfs = *msfs;
2369 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2373 static int osc_statfs_async(struct obd_export *exp,
2374 struct obd_info *oinfo, __u64 max_age,
2375 struct ptlrpc_request_set *rqset)
2377 struct obd_device *obd = class_exp2obd(exp);
2378 struct ptlrpc_request *req;
2379 struct osc_async_args *aa;
2383 /* We could possibly pass max_age in the request (as an absolute
2384 * timestamp or a "seconds.usec ago") so the target can avoid doing
2385 * extra calls into the filesystem if that isn't necessary (e.g.
2386 * during mount that would help a bit). Having relative timestamps
2387 * is not so great if request processing is slow, while absolute
2388 * timestamps are not ideal because they need time synchronization. */
2389 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2393 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2395 ptlrpc_request_free(req);
2398 ptlrpc_request_set_replen(req);
2399 req->rq_request_portal = OST_CREATE_PORTAL;
2400 ptlrpc_at_set_req_timeout(req);
2402 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2403 /* procfs requests not want stat in wait for avoid deadlock */
2404 req->rq_no_resend = 1;
2405 req->rq_no_delay = 1;
2408 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2409 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2410 aa = ptlrpc_req_async_args(req);
2413 ptlrpc_set_add_req(rqset, req);
2417 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2418 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2420 struct obd_device *obd = class_exp2obd(exp);
2421 struct obd_statfs *msfs;
2422 struct ptlrpc_request *req;
2423 struct obd_import *imp = NULL;
2427 /*Since the request might also come from lprocfs, so we need
2428 *sync this with client_disconnect_export Bug15684*/
2429 down_read(&obd->u.cli.cl_sem);
2430 if (obd->u.cli.cl_import)
2431 imp = class_import_get(obd->u.cli.cl_import);
2432 up_read(&obd->u.cli.cl_sem);
2436 /* We could possibly pass max_age in the request (as an absolute
2437 * timestamp or a "seconds.usec ago") so the target can avoid doing
2438 * extra calls into the filesystem if that isn't necessary (e.g.
2439 * during mount that would help a bit). Having relative timestamps
2440 * is not so great if request processing is slow, while absolute
2441 * timestamps are not ideal because they need time synchronization. */
2442 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2444 class_import_put(imp);
2449 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2451 ptlrpc_request_free(req);
2454 ptlrpc_request_set_replen(req);
2455 req->rq_request_portal = OST_CREATE_PORTAL;
2456 ptlrpc_at_set_req_timeout(req);
2458 if (flags & OBD_STATFS_NODELAY) {
2459 /* procfs requests not want stat in wait for avoid deadlock */
2460 req->rq_no_resend = 1;
2461 req->rq_no_delay = 1;
2464 rc = ptlrpc_queue_wait(req);
2468 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2470 GOTO(out, rc = -EPROTO);
2477 ptlrpc_req_finished(req);
2481 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2482 void *karg, void __user *uarg)
2484 struct obd_device *obd = exp->exp_obd;
2485 struct obd_ioctl_data *data = karg;
2489 if (!try_module_get(THIS_MODULE)) {
2490 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2491 module_name(THIS_MODULE));
2495 case OBD_IOC_CLIENT_RECOVER:
2496 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2497 data->ioc_inlbuf1, 0);
2501 case IOC_OSC_SET_ACTIVE:
2502 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2505 case OBD_IOC_PING_TARGET:
2506 err = ptlrpc_obd_ping(obd);
2509 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2510 cmd, current_comm());
2511 GOTO(out, err = -ENOTTY);
2514 module_put(THIS_MODULE);
2518 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2519 u32 keylen, void *key,
2520 u32 vallen, void *val,
2521 struct ptlrpc_request_set *set)
2523 struct ptlrpc_request *req;
2524 struct obd_device *obd = exp->exp_obd;
2525 struct obd_import *imp = class_exp2cliimp(exp);
2530 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2532 if (KEY_IS(KEY_CHECKSUM)) {
2533 if (vallen != sizeof(int))
2535 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2539 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2540 sptlrpc_conf_client_adapt(obd);
2544 if (KEY_IS(KEY_FLUSH_CTX)) {
2545 sptlrpc_import_flush_my_ctx(imp);
2549 if (KEY_IS(KEY_CACHE_SET)) {
2550 struct client_obd *cli = &obd->u.cli;
2552 LASSERT(cli->cl_cache == NULL); /* only once */
2553 cli->cl_cache = (struct cl_client_cache *)val;
2554 cl_cache_incref(cli->cl_cache);
2555 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2557 /* add this osc into entity list */
2558 LASSERT(list_empty(&cli->cl_lru_osc));
2559 spin_lock(&cli->cl_cache->ccc_lru_lock);
2560 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2561 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2566 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2567 struct client_obd *cli = &obd->u.cli;
2568 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2569 long target = *(long *)val;
2571 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2576 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2579 /* We pass all other commands directly to OST. Since nobody calls osc
2580 methods directly and everybody is supposed to go through LOV, we
2581 assume lov checked invalid values for us.
2582 The only recognised values so far are evict_by_nid and mds_conn.
2583 Even if something bad goes through, we'd get a -EINVAL from OST
2586 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2587 &RQF_OST_SET_GRANT_INFO :
2592 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2593 RCL_CLIENT, keylen);
2594 if (!KEY_IS(KEY_GRANT_SHRINK))
2595 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2596 RCL_CLIENT, vallen);
2597 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2599 ptlrpc_request_free(req);
2603 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2604 memcpy(tmp, key, keylen);
2605 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2608 memcpy(tmp, val, vallen);
2610 if (KEY_IS(KEY_GRANT_SHRINK)) {
2611 struct osc_grant_args *aa;
2614 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2615 aa = ptlrpc_req_async_args(req);
2618 ptlrpc_req_finished(req);
2621 *oa = ((struct ost_body *)val)->oa;
2623 req->rq_interpret_reply = osc_shrink_grant_interpret;
2626 ptlrpc_request_set_replen(req);
2627 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2628 LASSERT(set != NULL);
2629 ptlrpc_set_add_req(set, req);
2630 ptlrpc_check_set(NULL, set);
2632 ptlrpcd_add_req(req);
2638 static int osc_reconnect(const struct lu_env *env,
2639 struct obd_export *exp, struct obd_device *obd,
2640 struct obd_uuid *cluuid,
2641 struct obd_connect_data *data,
2644 struct client_obd *cli = &obd->u.cli;
2646 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2650 spin_lock(&cli->cl_loi_list_lock);
2651 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2652 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2653 grant += cli->cl_dirty_grant;
2655 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2656 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2657 lost_grant = cli->cl_lost_grant;
2658 cli->cl_lost_grant = 0;
2659 spin_unlock(&cli->cl_loi_list_lock);
2661 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2662 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2663 data->ocd_version, data->ocd_grant, lost_grant);
2669 static int osc_disconnect(struct obd_export *exp)
2671 struct obd_device *obd = class_exp2obd(exp);
2674 rc = client_disconnect_export(exp);
2676 * Initially we put del_shrink_grant before disconnect_export, but it
2677 * causes the following problem if setup (connect) and cleanup
2678 * (disconnect) are tangled together.
2679 * connect p1 disconnect p2
2680 * ptlrpc_connect_import
2681 * ............... class_manual_cleanup
2684 * ptlrpc_connect_interrupt
2686 * add this client to shrink list
2688 * Bang! pinger trigger the shrink.
2689 * So the osc should be disconnected from the shrink list, after we
2690 * are sure the import has been destroyed. BUG18662
2692 if (obd->u.cli.cl_import == NULL)
2693 osc_del_shrink_grant(&obd->u.cli);
2697 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2698 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2700 struct lu_env *env = arg;
2701 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2702 struct ldlm_lock *lock;
2703 struct osc_object *osc = NULL;
2707 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2708 if (lock->l_ast_data != NULL && osc == NULL) {
2709 osc = lock->l_ast_data;
2710 cl_object_get(osc2cl(osc));
2713 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2714 * by the 2nd round of ldlm_namespace_clean() call in
2715 * osc_import_event(). */
2716 ldlm_clear_cleaned(lock);
2721 osc_object_invalidate(env, osc);
2722 cl_object_put(env, osc2cl(osc));
2728 static int osc_import_event(struct obd_device *obd,
2729 struct obd_import *imp,
2730 enum obd_import_event event)
2732 struct client_obd *cli;
2736 LASSERT(imp->imp_obd == obd);
2739 case IMP_EVENT_DISCON: {
2741 spin_lock(&cli->cl_loi_list_lock);
2742 cli->cl_avail_grant = 0;
2743 cli->cl_lost_grant = 0;
2744 spin_unlock(&cli->cl_loi_list_lock);
2747 case IMP_EVENT_INACTIVE: {
2748 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2751 case IMP_EVENT_INVALIDATE: {
2752 struct ldlm_namespace *ns = obd->obd_namespace;
2756 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2758 env = cl_env_get(&refcheck);
2760 osc_io_unplug(env, &obd->u.cli, NULL);
2762 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2763 osc_ldlm_resource_invalidate,
2765 cl_env_put(env, &refcheck);
2767 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2772 case IMP_EVENT_ACTIVE: {
2773 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2776 case IMP_EVENT_OCD: {
2777 struct obd_connect_data *ocd = &imp->imp_connect_data;
2779 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2780 osc_init_grant(&obd->u.cli, ocd);
2783 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2784 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2786 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2789 case IMP_EVENT_DEACTIVATE: {
2790 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
2793 case IMP_EVENT_ACTIVATE: {
2794 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
2798 CERROR("Unknown import event %d\n", event);
2805 * Determine whether the lock can be canceled before replaying the lock
2806 * during recovery, see bug16774 for detailed information.
2808 * \retval zero the lock can't be canceled
2809 * \retval other ok to cancel
2811 static int osc_cancel_weight(struct ldlm_lock *lock)
2814 * Cancel all unused and granted extent lock.
2816 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2817 lock->l_granted_mode == lock->l_req_mode &&
2818 osc_ldlm_weigh_ast(lock) == 0)
2824 static int brw_queue_work(const struct lu_env *env, void *data)
2826 struct client_obd *cli = data;
2828 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2830 osc_io_unplug(env, cli, NULL);
2834 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2836 struct client_obd *cli = &obd->u.cli;
2837 struct obd_type *type;
2845 rc = ptlrpcd_addref();
2849 rc = client_obd_setup(obd, lcfg);
2851 GOTO(out_ptlrpcd, rc);
2853 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2854 if (IS_ERR(handler))
2855 GOTO(out_client_setup, rc = PTR_ERR(handler));
2856 cli->cl_writeback_work = handler;
2858 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2859 if (IS_ERR(handler))
2860 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2861 cli->cl_lru_work = handler;
2863 rc = osc_quota_setup(obd);
2865 GOTO(out_ptlrpcd_work, rc);
2867 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2869 #ifdef CONFIG_PROC_FS
2870 obd->obd_vars = lprocfs_osc_obd_vars;
2872 /* If this is true then both client (osc) and server (osp) are on the
2873 * same node. The osp layer if loaded first will register the osc proc
2874 * directory. In that case this obd_device will be attached its proc
2875 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot.
2877 type = class_search_type(LUSTRE_OSP_NAME);
2878 if (type && type->typ_procsym) {
2879 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2881 obd->obd_vars, obd);
2882 if (IS_ERR(obd->obd_proc_entry)) {
2883 rc = PTR_ERR(obd->obd_proc_entry);
2884 CERROR("error %d setting up lprocfs for %s\n", rc,
2886 obd->obd_proc_entry = NULL;
2890 rc = lprocfs_obd_setup(obd, false);
2892 /* If the basic OSC proc tree construction succeeded then
2895 lproc_osc_attach_seqstat(obd);
2896 sptlrpc_lprocfs_cliobd_attach(obd);
2897 ptlrpc_lprocfs_register_obd(obd);
2901 * We try to control the total number of requests with a upper limit
2902 * osc_reqpool_maxreqcount. There might be some race which will cause
2903 * over-limit allocation, but it is fine.
2905 req_count = atomic_read(&osc_pool_req_count);
2906 if (req_count < osc_reqpool_maxreqcount) {
2907 adding = cli->cl_max_rpcs_in_flight + 2;
2908 if (req_count + adding > osc_reqpool_maxreqcount)
2909 adding = osc_reqpool_maxreqcount - req_count;
2911 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2912 atomic_add(added, &osc_pool_req_count);
2915 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2916 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2918 spin_lock(&osc_shrink_lock);
2919 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2920 spin_unlock(&osc_shrink_lock);
2925 if (cli->cl_writeback_work != NULL) {
2926 ptlrpcd_destroy_work(cli->cl_writeback_work);
2927 cli->cl_writeback_work = NULL;
2929 if (cli->cl_lru_work != NULL) {
2930 ptlrpcd_destroy_work(cli->cl_lru_work);
2931 cli->cl_lru_work = NULL;
2934 client_obd_cleanup(obd);
2940 static int osc_precleanup(struct obd_device *obd)
2942 struct client_obd *cli = &obd->u.cli;
2946 * for echo client, export may be on zombie list, wait for
2947 * zombie thread to cull it, because cli.cl_import will be
2948 * cleared in client_disconnect_export():
2949 * class_export_destroy() -> obd_cleanup() ->
2950 * echo_device_free() -> echo_client_cleanup() ->
2951 * obd_disconnect() -> osc_disconnect() ->
2952 * client_disconnect_export()
2954 obd_zombie_barrier();
2955 if (cli->cl_writeback_work) {
2956 ptlrpcd_destroy_work(cli->cl_writeback_work);
2957 cli->cl_writeback_work = NULL;
2960 if (cli->cl_lru_work) {
2961 ptlrpcd_destroy_work(cli->cl_lru_work);
2962 cli->cl_lru_work = NULL;
2965 obd_cleanup_client_import(obd);
2966 ptlrpc_lprocfs_unregister_obd(obd);
2967 lprocfs_obd_cleanup(obd);
2971 int osc_cleanup(struct obd_device *obd)
2973 struct client_obd *cli = &obd->u.cli;
2978 spin_lock(&osc_shrink_lock);
2979 list_del(&cli->cl_shrink_list);
2980 spin_unlock(&osc_shrink_lock);
2983 if (cli->cl_cache != NULL) {
2984 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2985 spin_lock(&cli->cl_cache->ccc_lru_lock);
2986 list_del_init(&cli->cl_lru_osc);
2987 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2988 cli->cl_lru_left = NULL;
2989 cl_cache_decref(cli->cl_cache);
2990 cli->cl_cache = NULL;
2993 /* free memory of osc quota cache */
2994 osc_quota_cleanup(obd);
2996 rc = client_obd_cleanup(obd);
3002 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3004 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3005 return rc > 0 ? 0: rc;
3008 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3010 return osc_process_config_base(obd, buf);
3013 static struct obd_ops osc_obd_ops = {
3014 .o_owner = THIS_MODULE,
3015 .o_setup = osc_setup,
3016 .o_precleanup = osc_precleanup,
3017 .o_cleanup = osc_cleanup,
3018 .o_add_conn = client_import_add_conn,
3019 .o_del_conn = client_import_del_conn,
3020 .o_connect = client_connect_import,
3021 .o_reconnect = osc_reconnect,
3022 .o_disconnect = osc_disconnect,
3023 .o_statfs = osc_statfs,
3024 .o_statfs_async = osc_statfs_async,
3025 .o_create = osc_create,
3026 .o_destroy = osc_destroy,
3027 .o_getattr = osc_getattr,
3028 .o_setattr = osc_setattr,
3029 .o_iocontrol = osc_iocontrol,
3030 .o_set_info_async = osc_set_info_async,
3031 .o_import_event = osc_import_event,
3032 .o_process_config = osc_process_config,
3033 .o_quotactl = osc_quotactl,
3036 static struct shrinker *osc_cache_shrinker;
3037 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3038 DEFINE_SPINLOCK(osc_shrink_lock);
3040 #ifndef HAVE_SHRINKER_COUNT
3041 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3043 struct shrink_control scv = {
3044 .nr_to_scan = shrink_param(sc, nr_to_scan),
3045 .gfp_mask = shrink_param(sc, gfp_mask)
3047 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3048 struct shrinker *shrinker = NULL;
3051 (void)osc_cache_shrink_scan(shrinker, &scv);
3053 return osc_cache_shrink_count(shrinker, &scv);
3057 static int __init osc_init(void)
3059 bool enable_proc = true;
3060 struct obd_type *type;
3061 unsigned int reqpool_size;
3062 unsigned int reqsize;
3064 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3065 osc_cache_shrink_count, osc_cache_shrink_scan);
3068 /* print an address of _any_ initialized kernel symbol from this
3069 * module, to allow debugging with gdb that doesn't support data
3070 * symbols from modules.*/
3071 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3073 rc = lu_kmem_init(osc_caches);
3077 type = class_search_type(LUSTRE_OSP_NAME);
3078 if (type != NULL && type->typ_procsym != NULL)
3079 enable_proc = false;
3081 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3082 LUSTRE_OSC_NAME, &osc_device_type);
3086 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3088 /* This is obviously too much memory, only prevent overflow here */
3089 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3090 GOTO(out_type, rc = -EINVAL);
3092 reqpool_size = osc_reqpool_mem_max << 20;
3095 while (reqsize < OST_IO_MAXREQSIZE)
3096 reqsize = reqsize << 1;
3099 * We don't enlarge the request count in OSC pool according to
3100 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3101 * tried after normal allocation failed. So a small OSC pool won't
3102 * cause much performance degression in most of cases.
3104 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3106 atomic_set(&osc_pool_req_count, 0);
3107 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3108 ptlrpc_add_rqs_to_pool);
3110 if (osc_rq_pool != NULL)
3114 class_unregister_type(LUSTRE_OSC_NAME);
3116 lu_kmem_fini(osc_caches);
3121 static void __exit osc_exit(void)
3123 remove_shrinker(osc_cache_shrinker);
3124 class_unregister_type(LUSTRE_OSC_NAME);
3125 lu_kmem_fini(osc_caches);
3126 ptlrpc_free_rq_pool(osc_rq_pool);
3129 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3130 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3131 MODULE_VERSION(LUSTRE_VERSION_STRING);
3132 MODULE_LICENSE("GPL");
3134 module_init(osc_init);
3135 module_exit(osc_exit);