4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_ladvise_args {
96 obd_enqueue_update_f la_upcall;
100 struct osc_enqueue_args {
101 struct obd_export *oa_exp;
102 enum ldlm_type oa_type;
103 enum ldlm_mode oa_mode;
105 osc_enqueue_upcall_f oa_upcall;
107 struct ost_lvb *oa_lvb;
108 struct lustre_handle oa_lockh;
109 unsigned int oa_agl:1;
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
118 struct ost_body *body;
120 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
123 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
129 struct ptlrpc_request *req;
130 struct ost_body *body;
134 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
138 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
140 ptlrpc_request_free(req);
144 osc_pack_req_body(req, oa);
146 ptlrpc_request_set_replen(req);
148 rc = ptlrpc_queue_wait(req);
152 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154 GOTO(out, rc = -EPROTO);
156 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
159 oa->o_blksize = cli_brw_size(exp->exp_obd);
160 oa->o_valid |= OBD_MD_FLBLKSZ;
164 ptlrpc_req_finished(req);
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
172 struct ptlrpc_request *req;
173 struct ost_body *body;
177 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
179 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
185 ptlrpc_request_free(req);
189 osc_pack_req_body(req, oa);
191 ptlrpc_request_set_replen(req);
193 rc = ptlrpc_queue_wait(req);
197 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
199 GOTO(out, rc = -EPROTO);
201 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
205 ptlrpc_req_finished(req);
210 static int osc_setattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_setattr_args *sa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 GOTO(out, rc = -EPROTO);
224 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
227 rc = sa->sa_upcall(sa->sa_cookie, rc);
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232 obd_enqueue_update_f upcall, void *cookie,
233 struct ptlrpc_request_set *rqset)
235 struct ptlrpc_request *req;
236 struct osc_setattr_args *sa;
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oa);
253 ptlrpc_request_set_replen(req);
255 /* do mds to ost setattr asynchronously */
257 /* Do not wait for response. */
258 ptlrpcd_add_req(req);
260 req->rq_interpret_reply =
261 (ptlrpc_interpterer_t)osc_setattr_interpret;
263 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264 sa = ptlrpc_req_async_args(req);
266 sa->sa_upcall = upcall;
267 sa->sa_cookie = cookie;
269 if (rqset == PTLRPCD_SET)
270 ptlrpcd_add_req(req);
272 ptlrpc_set_add_req(rqset, req);
278 static int osc_ladvise_interpret(const struct lu_env *env,
279 struct ptlrpc_request *req,
282 struct osc_ladvise_args *la = arg;
283 struct ost_body *body;
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 *la->la_oa = body->oa;
295 rc = la->la_upcall(la->la_cookie, rc);
300 * If rqset is NULL, do not wait for response. Upcall and cookie could also
301 * be NULL in this case
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304 struct ladvise_hdr *ladvise_hdr,
305 obd_enqueue_update_f upcall, void *cookie,
306 struct ptlrpc_request_set *rqset)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
310 struct osc_ladvise_args *la;
312 struct lu_ladvise *req_ladvise;
313 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
314 int num_advise = ladvise_hdr->lah_count;
315 struct ladvise_hdr *req_ladvise_hdr;
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
322 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323 num_advise * sizeof(*ladvise));
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
326 ptlrpc_request_free(req);
329 req->rq_request_portal = OST_IO_PORTAL;
330 ptlrpc_at_set_req_timeout(req);
332 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
337 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338 &RMF_OST_LADVISE_HDR);
339 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
341 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343 ptlrpc_request_set_replen(req);
346 /* Do not wait for response. */
347 ptlrpcd_add_req(req);
351 req->rq_interpret_reply = osc_ladvise_interpret;
352 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353 la = ptlrpc_req_async_args(req);
355 la->la_upcall = upcall;
356 la->la_cookie = cookie;
358 if (rqset == PTLRPCD_SET)
359 ptlrpcd_add_req(req);
361 ptlrpc_set_add_req(rqset, req);
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
369 struct ptlrpc_request *req;
370 struct ost_body *body;
375 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
380 GOTO(out, rc = -ENOMEM);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
384 ptlrpc_request_free(req);
388 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
391 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
393 ptlrpc_request_set_replen(req);
395 rc = ptlrpc_queue_wait(req);
399 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
401 GOTO(out_req, rc = -EPROTO);
403 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
406 oa->o_blksize = cli_brw_size(exp->exp_obd);
407 oa->o_valid |= OBD_MD_FLBLKSZ;
409 CDEBUG(D_HA, "transno: "LPD64"\n",
410 lustre_msg_get_transno(req->rq_repmsg));
412 ptlrpc_req_finished(req);
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418 obd_enqueue_update_f upcall, void *cookie,
419 struct ptlrpc_request_set *rqset)
421 struct ptlrpc_request *req;
422 struct osc_setattr_args *sa;
423 struct ost_body *body;
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
431 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
433 ptlrpc_request_free(req);
436 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437 ptlrpc_at_set_req_timeout(req);
439 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
443 ptlrpc_request_set_replen(req);
445 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447 sa = ptlrpc_req_async_args(req);
449 sa->sa_upcall = upcall;
450 sa->sa_cookie = cookie;
451 if (rqset == PTLRPCD_SET)
452 ptlrpcd_add_req(req);
454 ptlrpc_set_add_req(rqset, req);
459 static int osc_sync_interpret(const struct lu_env *env,
460 struct ptlrpc_request *req,
463 struct osc_fsync_args *fa = arg;
464 struct ost_body *body;
465 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
466 unsigned long valid = 0;
467 struct cl_object *obj;
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 CERROR("can't unpack ost_body\n");
476 GOTO(out, rc = -EPROTO);
479 *fa->fa_oa = body->oa;
480 obj = osc2cl(fa->fa_obj);
482 /* Update osc object's blocks attribute */
483 cl_object_attr_lock(obj);
484 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485 attr->cat_blocks = body->oa.o_blocks;
490 cl_object_attr_update(env, obj, attr, valid);
491 cl_object_attr_unlock(obj);
494 rc = fa->fa_upcall(fa->fa_cookie, rc);
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499 obd_enqueue_update_f upcall, void *cookie,
500 struct ptlrpc_request_set *rqset)
502 struct obd_export *exp = osc_export(obj);
503 struct ptlrpc_request *req;
504 struct ost_body *body;
505 struct osc_fsync_args *fa;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
513 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
515 ptlrpc_request_free(req);
519 /* overload the size and blocks fields in the oa with start/end */
520 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
522 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
524 ptlrpc_request_set_replen(req);
525 req->rq_interpret_reply = osc_sync_interpret;
527 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528 fa = ptlrpc_req_async_args(req);
531 fa->fa_upcall = upcall;
532 fa->fa_cookie = cookie;
534 if (rqset == PTLRPCD_SET)
535 ptlrpcd_add_req(req);
537 ptlrpc_set_add_req(rqset, req);
542 /* Find and cancel locally locks matched by @mode in the resource found by
543 * @objid. Found locks are added into @cancel list. Returns the amount of
544 * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546 struct list_head *cancels,
547 enum ldlm_mode mode, __u64 lock_flags)
549 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550 struct ldlm_res_id res_id;
551 struct ldlm_resource *res;
555 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556 * export) but disabled through procfs (flag in NS).
558 * This distinguishes from a case when ELC is not supported originally,
559 * when we still want to cancel locks in advance and just cancel them
560 * locally, without sending any RPC. */
561 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
564 ostid_build_res_name(&oa->o_oi, &res_id);
565 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
569 LDLM_RESOURCE_ADDREF(res);
570 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571 lock_flags, 0, NULL);
572 LDLM_RESOURCE_DELREF(res);
573 ldlm_resource_putref(res);
577 static int osc_destroy_interpret(const struct lu_env *env,
578 struct ptlrpc_request *req, void *data,
581 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
583 atomic_dec(&cli->cl_destroy_in_flight);
584 wake_up(&cli->cl_destroy_waitq);
588 static int osc_can_send_destroy(struct client_obd *cli)
590 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591 cli->cl_max_rpcs_in_flight) {
592 /* The destroy request can be sent */
595 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596 cli->cl_max_rpcs_in_flight) {
598 * The counter has been modified between the two atomic
601 wake_up(&cli->cl_destroy_waitq);
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
609 struct client_obd *cli = &exp->exp_obd->u.cli;
610 struct ptlrpc_request *req;
611 struct ost_body *body;
612 struct list_head cancels = LIST_HEAD_INIT(cancels);
617 CDEBUG(D_INFO, "oa NULL\n");
621 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622 LDLM_FL_DISCARD_DATA);
624 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
626 ldlm_lock_list_put(&cancels, l_bl_ast, count);
630 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
633 ptlrpc_request_free(req);
637 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638 ptlrpc_at_set_req_timeout(req);
640 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
642 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
644 ptlrpc_request_set_replen(req);
646 req->rq_interpret_reply = osc_destroy_interpret;
647 if (!osc_can_send_destroy(cli)) {
648 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
651 * Wait until the number of on-going destroy RPCs drops
652 * under max_rpc_in_flight
654 l_wait_event_exclusive(cli->cl_destroy_waitq,
655 osc_can_send_destroy(cli), &lwi);
658 /* Do not wait for response */
659 ptlrpcd_add_req(req);
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
666 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
668 LASSERT(!(oa->o_valid & bits));
671 spin_lock(&cli->cl_loi_list_lock);
672 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673 oa->o_dirty = cli->cl_dirty_grant;
675 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677 cli->cl_dirty_max_pages)) {
678 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679 cli->cl_dirty_pages, cli->cl_dirty_transit,
680 cli->cl_dirty_max_pages);
682 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683 atomic_long_read(&obd_dirty_transit_pages) >
684 (long)(obd_max_dirty_pages + 1))) {
685 /* The atomic_read() allowing the atomic_inc() are
686 * not covered by a lock thus they may safely race and trip
687 * this CERROR() unless we add in a small fudge factor (+1). */
688 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689 cli_name(cli), atomic_long_read(&obd_dirty_pages),
690 atomic_long_read(&obd_dirty_transit_pages),
691 obd_max_dirty_pages);
693 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
695 CERROR("dirty %lu - dirty_max %lu too big???\n",
696 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
699 unsigned long nrpages;
701 nrpages = cli->cl_max_pages_per_rpc;
702 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
709 /* take extent tax into account when asking for more
711 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
712 cli->cl_max_extent_pages;
713 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
716 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717 oa->o_dropped = cli->cl_lost_grant;
718 cli->cl_lost_grant = 0;
719 spin_unlock(&cli->cl_loi_list_lock);
720 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
724 void osc_update_next_shrink(struct client_obd *cli)
726 cli->cl_next_shrink_grant =
727 cfs_time_shift(cli->cl_grant_shrink_interval);
728 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729 cli->cl_next_shrink_grant);
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
734 spin_lock(&cli->cl_loi_list_lock);
735 cli->cl_avail_grant += grant;
736 spin_unlock(&cli->cl_loi_list_lock);
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
741 if (body->oa.o_valid & OBD_MD_FLGRANT) {
742 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743 __osc_update_grant(cli, body->oa.o_grant);
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748 u32 keylen, void *key,
749 u32 vallen, void *val,
750 struct ptlrpc_request_set *set);
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753 struct ptlrpc_request *req,
756 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758 struct ost_body *body;
761 __osc_update_grant(cli, oa->o_grant);
765 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
767 osc_update_grant(cli, body);
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
775 spin_lock(&cli->cl_loi_list_lock);
776 oa->o_grant = cli->cl_avail_grant / 4;
777 cli->cl_avail_grant -= oa->o_grant;
778 spin_unlock(&cli->cl_loi_list_lock);
779 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780 oa->o_valid |= OBD_MD_FLFLAGS;
783 oa->o_flags |= OBD_FL_SHRINK_GRANT;
784 osc_update_next_shrink(cli);
787 /* Shrink the current grant, either from some large amount to enough for a
788 * full set of in-flight RPCs, or if we have already shrunk to that limit
789 * then to enough for a single RPC. This avoids keeping more grant than
790 * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
793 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
796 spin_lock(&cli->cl_loi_list_lock);
797 if (cli->cl_avail_grant <= target_bytes)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799 spin_unlock(&cli->cl_loi_list_lock);
801 return osc_shrink_grant_to_target(cli, target_bytes);
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
807 struct ost_body *body;
810 spin_lock(&cli->cl_loi_list_lock);
811 /* Don't shrink if we are already above or below the desired limit
812 * We don't want to shrink below a single RPC, as that will negatively
813 * impact block allocation and long-term performance. */
814 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
817 if (target_bytes >= cli->cl_avail_grant) {
818 spin_unlock(&cli->cl_loi_list_lock);
821 spin_unlock(&cli->cl_loi_list_lock);
827 osc_announce_cached(cli, &body->oa, 0);
829 spin_lock(&cli->cl_loi_list_lock);
830 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831 cli->cl_avail_grant = target_bytes;
832 spin_unlock(&cli->cl_loi_list_lock);
833 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834 body->oa.o_valid |= OBD_MD_FLFLAGS;
835 body->oa.o_flags = 0;
837 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838 osc_update_next_shrink(cli);
840 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842 sizeof(*body), body, NULL);
844 __osc_update_grant(cli, body->oa.o_grant);
849 static int osc_should_shrink_grant(struct client_obd *client)
851 cfs_time_t time = cfs_time_current();
852 cfs_time_t next_shrink = client->cl_next_shrink_grant;
854 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855 OBD_CONNECT_GRANT_SHRINK) == 0)
858 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859 /* Get the current RPC size directly, instead of going via:
860 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861 * Keep comment here so that it can be found by searching. */
862 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
864 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865 client->cl_avail_grant > brw_size)
868 osc_update_next_shrink(client);
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
875 struct client_obd *client;
877 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878 if (osc_should_shrink_grant(client))
879 osc_shrink_grant(client);
884 static int osc_add_shrink_grant(struct client_obd *client)
888 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
890 osc_grant_shrink_grant_cb, NULL,
891 &client->cl_grant_shrink_list);
893 CERROR("add grant client %s error %d\n", cli_name(client), rc);
896 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897 osc_update_next_shrink(client);
901 static int osc_del_shrink_grant(struct client_obd *client)
903 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
910 * ocd_grant is the total grant amount we're expect to hold: if we've
911 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
915 * race is tolerable here: if we're evicted, but imp_state already
916 * left EVICTED state, then cl_dirty_pages must be 0 already.
918 spin_lock(&cli->cl_loi_list_lock);
919 cli->cl_avail_grant = ocd->ocd_grant;
920 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921 cli->cl_avail_grant -= cli->cl_reserved_grant;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923 cli->cl_avail_grant -= cli->cl_dirty_grant;
925 cli->cl_avail_grant -=
926 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
929 if (cli->cl_avail_grant < 0) {
930 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931 cli_name(cli), cli->cl_avail_grant,
932 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933 /* workaround for servers which do not have the patch from
935 cli->cl_avail_grant = ocd->ocd_grant;
938 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
941 /* overhead for each extent insertion */
942 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943 /* determine the appropriate chunk size used by osc_extent. */
944 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945 ocd->ocd_grant_blkbits);
946 /* determine maximum extent size, in #pages */
947 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949 if (cli->cl_max_extent_pages == 0)
950 cli->cl_max_extent_pages = 1;
952 cli->cl_grant_extent_tax = 0;
953 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
956 spin_unlock(&cli->cl_loi_list_lock);
958 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959 "chunk bits: %d cl_max_extent_pages: %d\n",
961 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962 cli->cl_max_extent_pages);
964 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965 list_empty(&cli->cl_grant_shrink_list))
966 osc_add_shrink_grant(cli);
969 /* We assume that the reason this OSC got a short read is because it read
970 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971 * via the LOV, and it _knows_ it's reading inside the file, it's just that
972 * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974 struct brw_page **pga)
979 /* skip bytes read OK */
980 while (nob_read > 0) {
981 LASSERT (page_count > 0);
983 if (pga[i]->count > nob_read) {
984 /* EOF inside this page */
985 ptr = kmap(pga[i]->pg) +
986 (pga[i]->off & ~PAGE_MASK);
987 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
994 nob_read -= pga[i]->count;
999 /* zero remaining pages */
1000 while (page_count-- > 0) {
1001 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002 memset(ptr, 0, pga[i]->count);
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009 int requested_nob, int niocount,
1010 size_t page_count, struct brw_page **pga)
1015 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016 sizeof(*remote_rcs) *
1018 if (remote_rcs == NULL) {
1019 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1023 /* return error if any niobuf was in error */
1024 for (i = 0; i < niocount; i++) {
1025 if ((int)remote_rcs[i] < 0)
1026 return(remote_rcs[i]);
1028 if (remote_rcs[i] != 0) {
1029 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030 i, remote_rcs[i], req);
1035 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037 req->rq_bulk->bd_nob_transferred, requested_nob);
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1046 if (p1->flag != p2->flag) {
1047 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1049 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1051 /* warn if we try to combine flags that we don't know to be
1052 * safe to combine */
1053 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055 "report this at https://jira.hpdd.intel.com/\n",
1056 p1->flag, p2->flag);
1061 return (p1->off + p1->count == p2->off);
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065 struct brw_page **pga, int opc,
1066 cksum_type_t cksum_type)
1070 struct cfs_crypto_hash_desc *hdesc;
1071 unsigned int bufsize;
1073 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1075 LASSERT(pg_count > 0);
1077 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078 if (IS_ERR(hdesc)) {
1079 CERROR("Unable to initialize checksum hash %s\n",
1080 cfs_crypto_hash_name(cfs_alg));
1081 return PTR_ERR(hdesc);
1084 while (nob > 0 && pg_count > 0) {
1085 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1087 /* corrupt the data before we compute the checksum, to
1088 * simulate an OST->client data error */
1089 if (i == 0 && opc == OST_READ &&
1090 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091 unsigned char *ptr = kmap(pga[i]->pg);
1092 int off = pga[i]->off & ~PAGE_MASK;
1094 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1097 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098 pga[i]->off & ~PAGE_MASK,
1100 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101 (int)(pga[i]->off & ~PAGE_MASK));
1103 nob -= pga[i]->count;
1108 bufsize = sizeof(cksum);
1109 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1111 /* For sending we only compute the wrong checksum instead
1112 * of corrupting the data so it is still correct on a redo */
1113 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121 u32 page_count, struct brw_page **pga,
1122 struct ptlrpc_request **reqp, int resend)
1124 struct ptlrpc_request *req;
1125 struct ptlrpc_bulk_desc *desc;
1126 struct ost_body *body;
1127 struct obd_ioobj *ioobj;
1128 struct niobuf_remote *niobuf;
1129 int niocount, i, requested_nob, opc, rc;
1130 struct osc_brw_async_args *aa;
1131 struct req_capsule *pill;
1132 struct brw_page *pg_prev;
1135 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136 RETURN(-ENOMEM); /* Recoverable */
1137 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138 RETURN(-EINVAL); /* Fatal */
1140 if ((cmd & OBD_BRW_WRITE) != 0) {
1142 req = ptlrpc_request_alloc_pool(cli->cl_import,
1144 &RQF_OST_BRW_WRITE);
1147 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1152 for (niocount = i = 1; i < page_count; i++) {
1153 if (!can_merge_pages(pga[i - 1], pga[i]))
1157 pill = &req->rq_pill;
1158 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1160 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161 niocount * sizeof(*niobuf));
1163 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1165 ptlrpc_request_free(req);
1168 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169 ptlrpc_at_set_req_timeout(req);
1170 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1172 req->rq_no_retry_einprogress = 1;
1174 desc = ptlrpc_prep_bulk_imp(req, page_count,
1175 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177 PTLRPC_BULK_PUT_SINK) |
1178 PTLRPC_BULK_BUF_KIOV,
1180 &ptlrpc_bulk_kiov_pin_ops);
1183 GOTO(out, rc = -ENOMEM);
1184 /* NB request now owns desc and will free it when it gets freed */
1186 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1191 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1193 obdo_to_ioobj(oa, ioobj);
1194 ioobj->ioo_bufcnt = niocount;
1195 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196 * that might be send for this request. The actual number is decided
1197 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198 * "max - 1" for old client compatibility sending "0", and also so the
1199 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201 LASSERT(page_count > 0);
1203 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204 struct brw_page *pg = pga[i];
1205 int poff = pg->off & ~PAGE_MASK;
1207 LASSERT(pg->count > 0);
1208 /* make sure there is no gap in the middle of page array */
1209 LASSERTF(page_count == 1 ||
1210 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211 ergo(i > 0 && i < page_count - 1,
1212 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1213 ergo(i == page_count - 1, poff == 0)),
1214 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215 i, page_count, pg, pg->off, pg->count);
1216 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1220 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221 pg_prev->pg, page_private(pg_prev->pg),
1222 pg_prev->pg->index, pg_prev->off);
1223 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224 (pg->flag & OBD_BRW_SRVLOCK));
1226 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227 requested_nob += pg->count;
1229 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1231 niobuf->rnb_len += pg->count;
1233 niobuf->rnb_offset = pg->off;
1234 niobuf->rnb_len = pg->count;
1235 niobuf->rnb_flags = pg->flag;
1240 LASSERTF((void *)(niobuf - niocount) ==
1241 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1245 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1247 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248 body->oa.o_valid |= OBD_MD_FLFLAGS;
1249 body->oa.o_flags = 0;
1251 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1254 if (osc_should_shrink_grant(cli))
1255 osc_shrink_grant_local(cli, &body->oa);
1257 /* size[REQ_REC_OFF] still sizeof (*body) */
1258 if (opc == OST_WRITE) {
1259 if (cli->cl_checksum &&
1260 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261 /* store cl_cksum_type in a local variable since
1262 * it can be changed via lprocfs */
1263 cksum_type_t cksum_type = cli->cl_cksum_type;
1265 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267 body->oa.o_flags = 0;
1269 body->oa.o_flags |= cksum_type_pack(cksum_type);
1270 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1275 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1277 /* save this in 'oa', too, for later checking */
1278 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279 oa->o_flags |= cksum_type_pack(cksum_type);
1281 /* clear out the checksum flag, in case this is a
1282 * resend but cl_checksum is no longer set. b=11238 */
1283 oa->o_valid &= ~OBD_MD_FLCKSUM;
1285 oa->o_cksum = body->oa.o_cksum;
1286 /* 1 RC per niobuf */
1287 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288 sizeof(__u32) * niocount);
1290 if (cli->cl_checksum &&
1291 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293 body->oa.o_flags = 0;
1294 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1298 ptlrpc_request_set_replen(req);
1300 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301 aa = ptlrpc_req_async_args(req);
1303 aa->aa_requested_nob = requested_nob;
1304 aa->aa_nio_count = niocount;
1305 aa->aa_page_count = page_count;
1309 INIT_LIST_HEAD(&aa->aa_oaps);
1312 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1319 ptlrpc_req_finished(req);
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324 __u32 client_cksum, __u32 server_cksum, int nob,
1325 size_t page_count, struct brw_page **pga,
1326 cksum_type_t client_cksum_type)
1330 cksum_type_t cksum_type;
1332 if (server_cksum == client_cksum) {
1333 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1337 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1339 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1342 if (cksum_type != client_cksum_type)
1343 msg = "the server did not use the checksum type specified in "
1344 "the original request - likely a protocol problem";
1345 else if (new_cksum == server_cksum)
1346 msg = "changed on the client after we checksummed it - "
1347 "likely false positive due to mmap IO (bug 11742)";
1348 else if (new_cksum == client_cksum)
1349 msg = "changed in transit before arrival at OST";
1351 msg = "changed in transit AND doesn't match the original - "
1352 "likely false positive due to mmap IO (bug 11742)";
1354 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356 msg, libcfs_nid2str(peer->nid),
1357 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360 POSTID(&oa->o_oi), pga[0]->off,
1361 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363 "client csum now %x\n", client_cksum, client_cksum_type,
1364 server_cksum, cksum_type, new_cksum);
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1371 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372 const lnet_process_id_t *peer =
1373 &req->rq_import->imp_connection->c_peer;
1374 struct client_obd *cli = aa->aa_cli;
1375 struct ost_body *body;
1376 u32 client_cksum = 0;
1379 if (rc < 0 && rc != -EDQUOT) {
1380 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1384 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1387 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1391 /* set/clear over quota flag for a uid/gid */
1392 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394 unsigned int qid[LL_MAXQUOTAS] =
1395 {body->oa.o_uid, body->oa.o_gid};
1397 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1398 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1400 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1403 osc_update_grant(cli, body);
1408 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1409 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1411 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1413 CERROR("Unexpected +ve rc %d\n", rc);
1416 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1418 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1421 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422 check_write_checksum(&body->oa, peer, client_cksum,
1423 body->oa.o_cksum, aa->aa_requested_nob,
1424 aa->aa_page_count, aa->aa_ppga,
1425 cksum_type_unpack(aa->aa_oa->o_flags)))
1428 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1429 aa->aa_page_count, aa->aa_ppga);
1433 /* The rest of this function executes only for OST_READs */
1435 /* if unwrap_bulk failed, return -EAGAIN to retry */
1436 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1438 GOTO(out, rc = -EAGAIN);
1440 if (rc > aa->aa_requested_nob) {
1441 CERROR("Unexpected rc %d (%d requested)\n", rc,
1442 aa->aa_requested_nob);
1446 if (rc != req->rq_bulk->bd_nob_transferred) {
1447 CERROR ("Unexpected rc %d (%d transferred)\n",
1448 rc, req->rq_bulk->bd_nob_transferred);
1452 if (rc < aa->aa_requested_nob)
1453 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1455 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1456 static int cksum_counter;
1457 u32 server_cksum = body->oa.o_cksum;
1460 cksum_type_t cksum_type;
1462 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1463 body->oa.o_flags : 0);
1464 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1465 aa->aa_ppga, OST_READ,
1468 if (peer->nid != req->rq_bulk->bd_sender) {
1470 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1473 if (server_cksum != client_cksum) {
1474 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475 "%s%s%s inode "DFID" object "DOSTID
1476 " extent ["LPU64"-"LPU64"]\n",
1477 req->rq_import->imp_obd->obd_name,
1478 libcfs_nid2str(peer->nid),
1480 body->oa.o_valid & OBD_MD_FLFID ?
1481 body->oa.o_parent_seq : (__u64)0,
1482 body->oa.o_valid & OBD_MD_FLFID ?
1483 body->oa.o_parent_oid : 0,
1484 body->oa.o_valid & OBD_MD_FLFID ?
1485 body->oa.o_parent_ver : 0,
1486 POSTID(&body->oa.o_oi),
1487 aa->aa_ppga[0]->off,
1488 aa->aa_ppga[aa->aa_page_count-1]->off +
1489 aa->aa_ppga[aa->aa_page_count-1]->count -
1491 CERROR("client %x, server %x, cksum_type %x\n",
1492 client_cksum, server_cksum, cksum_type);
1494 aa->aa_oa->o_cksum = client_cksum;
1498 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1501 } else if (unlikely(client_cksum)) {
1502 static int cksum_missed;
1505 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506 CERROR("Checksum %u requested from %s but not sent\n",
1507 cksum_missed, libcfs_nid2str(peer->nid));
1513 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514 aa->aa_oa, &body->oa);
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520 struct osc_brw_async_args *aa, int rc)
1522 struct ptlrpc_request *new_req;
1523 struct osc_brw_async_args *new_aa;
1524 struct osc_async_page *oap;
1527 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528 "redo for recoverable error %d", rc);
1530 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1532 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1533 aa->aa_ppga, &new_req, 1);
1537 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1538 if (oap->oap_request != NULL) {
1539 LASSERTF(request == oap->oap_request,
1540 "request %p != oap_request %p\n",
1541 request, oap->oap_request);
1542 if (oap->oap_interrupted) {
1543 ptlrpc_req_finished(new_req);
1548 /* New request takes over pga and oaps from old request.
1549 * Note that copying a list_head doesn't work, need to move it... */
1551 new_req->rq_interpret_reply = request->rq_interpret_reply;
1552 new_req->rq_async_args = request->rq_async_args;
1553 new_req->rq_commit_cb = request->rq_commit_cb;
1554 /* cap resend delay to the current request timeout, this is similar to
1555 * what ptlrpc does (see after_reply()) */
1556 if (aa->aa_resends > new_req->rq_timeout)
1557 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1559 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1560 new_req->rq_generation_set = 1;
1561 new_req->rq_import_generation = request->rq_import_generation;
1563 new_aa = ptlrpc_req_async_args(new_req);
1565 INIT_LIST_HEAD(&new_aa->aa_oaps);
1566 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1567 INIT_LIST_HEAD(&new_aa->aa_exts);
1568 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1569 new_aa->aa_resends = aa->aa_resends;
1571 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1572 if (oap->oap_request) {
1573 ptlrpc_req_finished(oap->oap_request);
1574 oap->oap_request = ptlrpc_request_addref(new_req);
1578 /* XXX: This code will run into problem if we're going to support
1579 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1580 * and wait for all of them to be finished. We should inherit request
1581 * set from old request. */
1582 ptlrpcd_add_req(new_req);
1584 DEBUG_REQ(D_INFO, new_req, "new request");
1589 * ugh, we want disk allocation on the target to happen in offset order. we'll
1590 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1591 * fine for our small page arrays and doesn't require allocation. its an
1592 * insertion sort that swaps elements that are strides apart, shrinking the
1593 * stride down until its '1' and the array is sorted.
1595 static void sort_brw_pages(struct brw_page **array, int num)
1598 struct brw_page *tmp;
1602 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1607 for (i = stride ; i < num ; i++) {
1610 while (j >= stride && array[j - stride]->off > tmp->off) {
1611 array[j] = array[j - stride];
1616 } while (stride > 1);
1619 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1621 LASSERT(ppga != NULL);
1622 OBD_FREE(ppga, sizeof(*ppga) * count);
1625 static int brw_interpret(const struct lu_env *env,
1626 struct ptlrpc_request *req, void *data, int rc)
1628 struct osc_brw_async_args *aa = data;
1629 struct osc_extent *ext;
1630 struct osc_extent *tmp;
1631 struct client_obd *cli = aa->aa_cli;
1634 rc = osc_brw_fini_request(req, rc);
1635 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1636 /* When server return -EINPROGRESS, client should always retry
1637 * regardless of the number of times the bulk was resent already. */
1638 if (osc_recoverable_error(rc)) {
1639 if (req->rq_import_generation !=
1640 req->rq_import->imp_generation) {
1641 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1642 ""DOSTID", rc = %d.\n",
1643 req->rq_import->imp_obd->obd_name,
1644 POSTID(&aa->aa_oa->o_oi), rc);
1645 } else if (rc == -EINPROGRESS ||
1646 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1647 rc = osc_brw_redo_request(req, aa, rc);
1649 CERROR("%s: too many resent retries for object: "
1650 ""LPU64":"LPU64", rc = %d.\n",
1651 req->rq_import->imp_obd->obd_name,
1652 POSTID(&aa->aa_oa->o_oi), rc);
1657 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1662 struct obdo *oa = aa->aa_oa;
1663 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1664 unsigned long valid = 0;
1665 struct cl_object *obj;
1666 struct osc_async_page *last;
1668 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1669 obj = osc2cl(last->oap_obj);
1671 cl_object_attr_lock(obj);
1672 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1673 attr->cat_blocks = oa->o_blocks;
1674 valid |= CAT_BLOCKS;
1676 if (oa->o_valid & OBD_MD_FLMTIME) {
1677 attr->cat_mtime = oa->o_mtime;
1680 if (oa->o_valid & OBD_MD_FLATIME) {
1681 attr->cat_atime = oa->o_atime;
1684 if (oa->o_valid & OBD_MD_FLCTIME) {
1685 attr->cat_ctime = oa->o_ctime;
1689 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1690 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1691 loff_t last_off = last->oap_count + last->oap_obj_off +
1694 /* Change file size if this is an out of quota or
1695 * direct IO write and it extends the file size */
1696 if (loi->loi_lvb.lvb_size < last_off) {
1697 attr->cat_size = last_off;
1700 /* Extend KMS if it's not a lockless write */
1701 if (loi->loi_kms < last_off &&
1702 oap2osc_page(last)->ops_srvlock == 0) {
1703 attr->cat_kms = last_off;
1709 cl_object_attr_update(env, obj, attr, valid);
1710 cl_object_attr_unlock(obj);
1712 OBDO_FREE(aa->aa_oa);
1714 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1715 osc_inc_unstable_pages(req);
1717 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1718 list_del_init(&ext->oe_link);
1719 osc_extent_finish(env, ext, 1, rc);
1721 LASSERT(list_empty(&aa->aa_exts));
1722 LASSERT(list_empty(&aa->aa_oaps));
1724 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1725 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1727 spin_lock(&cli->cl_loi_list_lock);
1728 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1729 * is called so we know whether to go to sync BRWs or wait for more
1730 * RPCs to complete */
1731 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1732 cli->cl_w_in_flight--;
1734 cli->cl_r_in_flight--;
1735 osc_wake_cache_waiters(cli);
1736 spin_unlock(&cli->cl_loi_list_lock);
1738 osc_io_unplug(env, cli, NULL);
1742 static void brw_commit(struct ptlrpc_request *req)
1744 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1745 * this called via the rq_commit_cb, I need to ensure
1746 * osc_dec_unstable_pages is still called. Otherwise unstable
1747 * pages may be leaked. */
1748 spin_lock(&req->rq_lock);
1749 if (likely(req->rq_unstable)) {
1750 req->rq_unstable = 0;
1751 spin_unlock(&req->rq_lock);
1753 osc_dec_unstable_pages(req);
1755 req->rq_committed = 1;
1756 spin_unlock(&req->rq_lock);
1761 * Build an RPC by the list of extent @ext_list. The caller must ensure
1762 * that the total pages in this list are NOT over max pages per RPC.
1763 * Extents in the list must be in OES_RPC state.
1765 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1766 struct list_head *ext_list, int cmd)
1768 struct ptlrpc_request *req = NULL;
1769 struct osc_extent *ext;
1770 struct brw_page **pga = NULL;
1771 struct osc_brw_async_args *aa = NULL;
1772 struct obdo *oa = NULL;
1773 struct osc_async_page *oap;
1774 struct osc_object *obj = NULL;
1775 struct cl_req_attr *crattr = NULL;
1776 loff_t starting_offset = OBD_OBJECT_EOF;
1777 loff_t ending_offset = 0;
1781 bool soft_sync = false;
1782 bool interrupted = false;
1786 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1787 struct ost_body *body;
1789 LASSERT(!list_empty(ext_list));
1791 /* add pages into rpc_list to build BRW rpc */
1792 list_for_each_entry(ext, ext_list, oe_link) {
1793 LASSERT(ext->oe_state == OES_RPC);
1794 mem_tight |= ext->oe_memalloc;
1795 grant += ext->oe_grants;
1796 page_count += ext->oe_nr_pages;
1801 soft_sync = osc_over_unstable_soft_limit(cli);
1803 mpflag = cfs_memory_pressure_get_and_set();
1805 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1807 GOTO(out, rc = -ENOMEM);
1811 GOTO(out, rc = -ENOMEM);
1814 list_for_each_entry(ext, ext_list, oe_link) {
1815 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1817 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1819 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1820 pga[i] = &oap->oap_brw_page;
1821 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1824 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1825 if (starting_offset == OBD_OBJECT_EOF ||
1826 starting_offset > oap->oap_obj_off)
1827 starting_offset = oap->oap_obj_off;
1829 LASSERT(oap->oap_page_off == 0);
1830 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1831 ending_offset = oap->oap_obj_off +
1834 LASSERT(oap->oap_page_off + oap->oap_count ==
1836 if (oap->oap_interrupted)
1841 /* first page in the list */
1842 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1844 crattr = &osc_env_info(env)->oti_req_attr;
1845 memset(crattr, 0, sizeof(*crattr));
1846 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1847 crattr->cra_flags = ~0ULL;
1848 crattr->cra_page = oap2cl_page(oap);
1849 crattr->cra_oa = oa;
1850 cl_req_attr_set(env, osc2cl(obj), crattr);
1852 if (cmd == OBD_BRW_WRITE)
1853 oa->o_grant_used = grant;
1855 sort_brw_pages(pga, page_count);
1856 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1858 CERROR("prep_req failed: %d\n", rc);
1862 req->rq_commit_cb = brw_commit;
1863 req->rq_interpret_reply = brw_interpret;
1864 req->rq_memalloc = mem_tight != 0;
1865 oap->oap_request = ptlrpc_request_addref(req);
1866 if (interrupted && !req->rq_intr)
1867 ptlrpc_mark_interrupted(req);
1869 /* Need to update the timestamps after the request is built in case
1870 * we race with setattr (locally or in queue at OST). If OST gets
1871 * later setattr before earlier BRW (as determined by the request xid),
1872 * the OST will not use BRW timestamps. Sadly, there is no obvious
1873 * way to do this in a single call. bug 10150 */
1874 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1875 crattr->cra_oa = &body->oa;
1876 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1877 cl_req_attr_set(env, osc2cl(obj), crattr);
1878 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1880 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1881 aa = ptlrpc_req_async_args(req);
1882 INIT_LIST_HEAD(&aa->aa_oaps);
1883 list_splice_init(&rpc_list, &aa->aa_oaps);
1884 INIT_LIST_HEAD(&aa->aa_exts);
1885 list_splice_init(ext_list, &aa->aa_exts);
1887 spin_lock(&cli->cl_loi_list_lock);
1888 starting_offset >>= PAGE_CACHE_SHIFT;
1889 if (cmd == OBD_BRW_READ) {
1890 cli->cl_r_in_flight++;
1891 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1892 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1893 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1894 starting_offset + 1);
1896 cli->cl_w_in_flight++;
1897 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1898 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1899 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1900 starting_offset + 1);
1902 spin_unlock(&cli->cl_loi_list_lock);
1904 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1905 page_count, aa, cli->cl_r_in_flight,
1906 cli->cl_w_in_flight);
1908 ptlrpcd_add_req(req);
1914 cfs_memory_pressure_restore(mpflag);
1917 LASSERT(req == NULL);
1922 OBD_FREE(pga, sizeof(*pga) * page_count);
1923 /* this should happen rarely and is pretty bad, it makes the
1924 * pending list not follow the dirty order */
1925 while (!list_empty(ext_list)) {
1926 ext = list_entry(ext_list->next, struct osc_extent,
1928 list_del_init(&ext->oe_link);
1929 osc_extent_finish(env, ext, 0, rc);
1935 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1939 LASSERT(lock != NULL);
1941 lock_res_and_lock(lock);
1943 if (lock->l_ast_data == NULL)
1944 lock->l_ast_data = data;
1945 if (lock->l_ast_data == data)
1948 unlock_res_and_lock(lock);
1953 static int osc_enqueue_fini(struct ptlrpc_request *req,
1954 osc_enqueue_upcall_f upcall, void *cookie,
1955 struct lustre_handle *lockh, enum ldlm_mode mode,
1956 __u64 *flags, int agl, int errcode)
1958 bool intent = *flags & LDLM_FL_HAS_INTENT;
1962 /* The request was created before ldlm_cli_enqueue call. */
1963 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1964 struct ldlm_reply *rep;
1966 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1967 LASSERT(rep != NULL);
1969 rep->lock_policy_res1 =
1970 ptlrpc_status_ntoh(rep->lock_policy_res1);
1971 if (rep->lock_policy_res1)
1972 errcode = rep->lock_policy_res1;
1974 *flags |= LDLM_FL_LVB_READY;
1975 } else if (errcode == ELDLM_OK) {
1976 *flags |= LDLM_FL_LVB_READY;
1979 /* Call the update callback. */
1980 rc = (*upcall)(cookie, lockh, errcode);
1982 /* release the reference taken in ldlm_cli_enqueue() */
1983 if (errcode == ELDLM_LOCK_MATCHED)
1985 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1986 ldlm_lock_decref(lockh, mode);
1991 static int osc_enqueue_interpret(const struct lu_env *env,
1992 struct ptlrpc_request *req,
1993 struct osc_enqueue_args *aa, int rc)
1995 struct ldlm_lock *lock;
1996 struct lustre_handle *lockh = &aa->oa_lockh;
1997 enum ldlm_mode mode = aa->oa_mode;
1998 struct ost_lvb *lvb = aa->oa_lvb;
1999 __u32 lvb_len = sizeof(*lvb);
2004 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2006 lock = ldlm_handle2lock(lockh);
2007 LASSERTF(lock != NULL,
2008 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2009 lockh->cookie, req, aa);
2011 /* Take an additional reference so that a blocking AST that
2012 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2013 * to arrive after an upcall has been executed by
2014 * osc_enqueue_fini(). */
2015 ldlm_lock_addref(lockh, mode);
2017 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2018 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2020 /* Let CP AST to grant the lock first. */
2021 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2024 LASSERT(aa->oa_lvb == NULL);
2025 LASSERT(aa->oa_flags == NULL);
2026 aa->oa_flags = &flags;
2029 /* Complete obtaining the lock procedure. */
2030 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2031 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2033 /* Complete osc stuff. */
2034 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2035 aa->oa_flags, aa->oa_agl, rc);
2037 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2039 ldlm_lock_decref(lockh, mode);
2040 LDLM_LOCK_PUT(lock);
2044 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2046 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2047 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2048 * other synchronous requests, however keeping some locks and trying to obtain
2049 * others may take a considerable amount of time in a case of ost failure; and
2050 * when other sync requests do not get released lock from a client, the client
2051 * is evicted from the cluster -- such scenarious make the life difficult, so
2052 * release locks just after they are obtained. */
2053 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2054 __u64 *flags, union ldlm_policy_data *policy,
2055 struct ost_lvb *lvb, int kms_valid,
2056 osc_enqueue_upcall_f upcall, void *cookie,
2057 struct ldlm_enqueue_info *einfo,
2058 struct ptlrpc_request_set *rqset, int async, int agl)
2060 struct obd_device *obd = exp->exp_obd;
2061 struct lustre_handle lockh = { 0 };
2062 struct ptlrpc_request *req = NULL;
2063 int intent = *flags & LDLM_FL_HAS_INTENT;
2064 __u64 match_flags = *flags;
2065 enum ldlm_mode mode;
2069 /* Filesystem lock extents are extended to page boundaries so that
2070 * dealing with the page cache is a little smoother. */
2071 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2072 policy->l_extent.end |= ~PAGE_MASK;
2075 * kms is not valid when either object is completely fresh (so that no
2076 * locks are cached), or object was evicted. In the latter case cached
2077 * lock cannot be used, because it would prime inode state with
2078 * potentially stale LVB.
2083 /* Next, search for already existing extent locks that will cover us */
2084 /* If we're trying to read, we also search for an existing PW lock. The
2085 * VFS and page cache already protect us locally, so lots of readers/
2086 * writers can share a single PW lock.
2088 * There are problems with conversion deadlocks, so instead of
2089 * converting a read lock to a write lock, we'll just enqueue a new
2092 * At some point we should cancel the read lock instead of making them
2093 * send us a blocking callback, but there are problems with canceling
2094 * locks out from other users right now, too. */
2095 mode = einfo->ei_mode;
2096 if (einfo->ei_mode == LCK_PR)
2099 match_flags |= LDLM_FL_LVB_READY;
2101 match_flags |= LDLM_FL_BLOCK_GRANTED;
2102 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2103 einfo->ei_type, policy, mode, &lockh, 0);
2105 struct ldlm_lock *matched;
2107 if (*flags & LDLM_FL_TEST_LOCK)
2110 matched = ldlm_handle2lock(&lockh);
2112 /* AGL enqueues DLM locks speculatively. Therefore if
2113 * it already exists a DLM lock, it wll just inform the
2114 * caller to cancel the AGL process for this stripe. */
2115 ldlm_lock_decref(&lockh, mode);
2116 LDLM_LOCK_PUT(matched);
2118 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2119 *flags |= LDLM_FL_LVB_READY;
2121 /* We already have a lock, and it's referenced. */
2122 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2124 ldlm_lock_decref(&lockh, mode);
2125 LDLM_LOCK_PUT(matched);
2128 ldlm_lock_decref(&lockh, mode);
2129 LDLM_LOCK_PUT(matched);
2134 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2138 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2139 &RQF_LDLM_ENQUEUE_LVB);
2143 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2145 ptlrpc_request_free(req);
2149 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2151 ptlrpc_request_set_replen(req);
2154 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2155 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2157 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2158 sizeof(*lvb), LVB_T_OST, &lockh, async);
2161 struct osc_enqueue_args *aa;
2162 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2163 aa = ptlrpc_req_async_args(req);
2165 aa->oa_mode = einfo->ei_mode;
2166 aa->oa_type = einfo->ei_type;
2167 lustre_handle_copy(&aa->oa_lockh, &lockh);
2168 aa->oa_upcall = upcall;
2169 aa->oa_cookie = cookie;
2172 aa->oa_flags = flags;
2175 /* AGL is essentially to enqueue an DLM lock
2176 * in advance, so we don't care about the
2177 * result of AGL enqueue. */
2179 aa->oa_flags = NULL;
2182 req->rq_interpret_reply =
2183 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2184 if (rqset == PTLRPCD_SET)
2185 ptlrpcd_add_req(req);
2187 ptlrpc_set_add_req(rqset, req);
2188 } else if (intent) {
2189 ptlrpc_req_finished(req);
2194 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2197 ptlrpc_req_finished(req);
2202 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2203 enum ldlm_type type, union ldlm_policy_data *policy,
2204 enum ldlm_mode mode, __u64 *flags, void *data,
2205 struct lustre_handle *lockh, int unref)
2207 struct obd_device *obd = exp->exp_obd;
2208 __u64 lflags = *flags;
2212 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2215 /* Filesystem lock extents are extended to page boundaries so that
2216 * dealing with the page cache is a little smoother */
2217 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2218 policy->l_extent.end |= ~PAGE_MASK;
2220 /* Next, search for already existing extent locks that will cover us */
2221 /* If we're trying to read, we also search for an existing PW lock. The
2222 * VFS and page cache already protect us locally, so lots of readers/
2223 * writers can share a single PW lock. */
2227 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2228 res_id, type, policy, rc, lockh, unref);
2229 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2233 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2235 LASSERT(lock != NULL);
2236 if (!osc_set_lock_data(lock, data)) {
2237 ldlm_lock_decref(lockh, rc);
2240 LDLM_LOCK_PUT(lock);
2245 static int osc_statfs_interpret(const struct lu_env *env,
2246 struct ptlrpc_request *req,
2247 struct osc_async_args *aa, int rc)
2249 struct obd_statfs *msfs;
2253 /* The request has in fact never been sent
2254 * due to issues at a higher level (LOV).
2255 * Exit immediately since the caller is
2256 * aware of the problem and takes care
2257 * of the clean up */
2260 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2261 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2267 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2269 GOTO(out, rc = -EPROTO);
2272 *aa->aa_oi->oi_osfs = *msfs;
2274 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2278 static int osc_statfs_async(struct obd_export *exp,
2279 struct obd_info *oinfo, __u64 max_age,
2280 struct ptlrpc_request_set *rqset)
2282 struct obd_device *obd = class_exp2obd(exp);
2283 struct ptlrpc_request *req;
2284 struct osc_async_args *aa;
2288 /* We could possibly pass max_age in the request (as an absolute
2289 * timestamp or a "seconds.usec ago") so the target can avoid doing
2290 * extra calls into the filesystem if that isn't necessary (e.g.
2291 * during mount that would help a bit). Having relative timestamps
2292 * is not so great if request processing is slow, while absolute
2293 * timestamps are not ideal because they need time synchronization. */
2294 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2298 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2300 ptlrpc_request_free(req);
2303 ptlrpc_request_set_replen(req);
2304 req->rq_request_portal = OST_CREATE_PORTAL;
2305 ptlrpc_at_set_req_timeout(req);
2307 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2308 /* procfs requests not want stat in wait for avoid deadlock */
2309 req->rq_no_resend = 1;
2310 req->rq_no_delay = 1;
2313 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2314 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2315 aa = ptlrpc_req_async_args(req);
2318 ptlrpc_set_add_req(rqset, req);
2322 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2323 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2325 struct obd_device *obd = class_exp2obd(exp);
2326 struct obd_statfs *msfs;
2327 struct ptlrpc_request *req;
2328 struct obd_import *imp = NULL;
2332 /*Since the request might also come from lprocfs, so we need
2333 *sync this with client_disconnect_export Bug15684*/
2334 down_read(&obd->u.cli.cl_sem);
2335 if (obd->u.cli.cl_import)
2336 imp = class_import_get(obd->u.cli.cl_import);
2337 up_read(&obd->u.cli.cl_sem);
2341 /* We could possibly pass max_age in the request (as an absolute
2342 * timestamp or a "seconds.usec ago") so the target can avoid doing
2343 * extra calls into the filesystem if that isn't necessary (e.g.
2344 * during mount that would help a bit). Having relative timestamps
2345 * is not so great if request processing is slow, while absolute
2346 * timestamps are not ideal because they need time synchronization. */
2347 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2349 class_import_put(imp);
2354 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2356 ptlrpc_request_free(req);
2359 ptlrpc_request_set_replen(req);
2360 req->rq_request_portal = OST_CREATE_PORTAL;
2361 ptlrpc_at_set_req_timeout(req);
2363 if (flags & OBD_STATFS_NODELAY) {
2364 /* procfs requests not want stat in wait for avoid deadlock */
2365 req->rq_no_resend = 1;
2366 req->rq_no_delay = 1;
2369 rc = ptlrpc_queue_wait(req);
2373 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2375 GOTO(out, rc = -EPROTO);
2382 ptlrpc_req_finished(req);
2386 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2387 void *karg, void __user *uarg)
2389 struct obd_device *obd = exp->exp_obd;
2390 struct obd_ioctl_data *data = karg;
2394 if (!try_module_get(THIS_MODULE)) {
2395 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2396 module_name(THIS_MODULE));
2400 case OBD_IOC_CLIENT_RECOVER:
2401 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2402 data->ioc_inlbuf1, 0);
2406 case IOC_OSC_SET_ACTIVE:
2407 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2410 case OBD_IOC_PING_TARGET:
2411 err = ptlrpc_obd_ping(obd);
2414 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2415 cmd, current_comm());
2416 GOTO(out, err = -ENOTTY);
2419 module_put(THIS_MODULE);
2423 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2424 u32 keylen, void *key,
2425 u32 vallen, void *val,
2426 struct ptlrpc_request_set *set)
2428 struct ptlrpc_request *req;
2429 struct obd_device *obd = exp->exp_obd;
2430 struct obd_import *imp = class_exp2cliimp(exp);
2435 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2437 if (KEY_IS(KEY_CHECKSUM)) {
2438 if (vallen != sizeof(int))
2440 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2444 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2445 sptlrpc_conf_client_adapt(obd);
2449 if (KEY_IS(KEY_FLUSH_CTX)) {
2450 sptlrpc_import_flush_my_ctx(imp);
2454 if (KEY_IS(KEY_CACHE_SET)) {
2455 struct client_obd *cli = &obd->u.cli;
2457 LASSERT(cli->cl_cache == NULL); /* only once */
2458 cli->cl_cache = (struct cl_client_cache *)val;
2459 cl_cache_incref(cli->cl_cache);
2460 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2462 /* add this osc into entity list */
2463 LASSERT(list_empty(&cli->cl_lru_osc));
2464 spin_lock(&cli->cl_cache->ccc_lru_lock);
2465 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2466 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2471 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2472 struct client_obd *cli = &obd->u.cli;
2473 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2474 long target = *(long *)val;
2476 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2481 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2484 /* We pass all other commands directly to OST. Since nobody calls osc
2485 methods directly and everybody is supposed to go through LOV, we
2486 assume lov checked invalid values for us.
2487 The only recognised values so far are evict_by_nid and mds_conn.
2488 Even if something bad goes through, we'd get a -EINVAL from OST
2491 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2492 &RQF_OST_SET_GRANT_INFO :
2497 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2498 RCL_CLIENT, keylen);
2499 if (!KEY_IS(KEY_GRANT_SHRINK))
2500 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2501 RCL_CLIENT, vallen);
2502 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2504 ptlrpc_request_free(req);
2508 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2509 memcpy(tmp, key, keylen);
2510 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2513 memcpy(tmp, val, vallen);
2515 if (KEY_IS(KEY_GRANT_SHRINK)) {
2516 struct osc_grant_args *aa;
2519 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2520 aa = ptlrpc_req_async_args(req);
2523 ptlrpc_req_finished(req);
2526 *oa = ((struct ost_body *)val)->oa;
2528 req->rq_interpret_reply = osc_shrink_grant_interpret;
2531 ptlrpc_request_set_replen(req);
2532 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2533 LASSERT(set != NULL);
2534 ptlrpc_set_add_req(set, req);
2535 ptlrpc_check_set(NULL, set);
2537 ptlrpcd_add_req(req);
2543 static int osc_reconnect(const struct lu_env *env,
2544 struct obd_export *exp, struct obd_device *obd,
2545 struct obd_uuid *cluuid,
2546 struct obd_connect_data *data,
2549 struct client_obd *cli = &obd->u.cli;
2551 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2555 spin_lock(&cli->cl_loi_list_lock);
2556 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2557 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2558 grant += cli->cl_dirty_grant;
2560 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2561 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2562 lost_grant = cli->cl_lost_grant;
2563 cli->cl_lost_grant = 0;
2564 spin_unlock(&cli->cl_loi_list_lock);
2566 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2567 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2568 data->ocd_version, data->ocd_grant, lost_grant);
2574 static int osc_disconnect(struct obd_export *exp)
2576 struct obd_device *obd = class_exp2obd(exp);
2579 rc = client_disconnect_export(exp);
2581 * Initially we put del_shrink_grant before disconnect_export, but it
2582 * causes the following problem if setup (connect) and cleanup
2583 * (disconnect) are tangled together.
2584 * connect p1 disconnect p2
2585 * ptlrpc_connect_import
2586 * ............... class_manual_cleanup
2589 * ptlrpc_connect_interrupt
2591 * add this client to shrink list
2593 * Bang! pinger trigger the shrink.
2594 * So the osc should be disconnected from the shrink list, after we
2595 * are sure the import has been destroyed. BUG18662
2597 if (obd->u.cli.cl_import == NULL)
2598 osc_del_shrink_grant(&obd->u.cli);
2602 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2603 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2605 struct lu_env *env = arg;
2606 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2607 struct ldlm_lock *lock;
2608 struct osc_object *osc = NULL;
2612 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2613 if (lock->l_ast_data != NULL && osc == NULL) {
2614 osc = lock->l_ast_data;
2615 cl_object_get(osc2cl(osc));
2618 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2619 * by the 2nd round of ldlm_namespace_clean() call in
2620 * osc_import_event(). */
2621 ldlm_clear_cleaned(lock);
2626 osc_object_invalidate(env, osc);
2627 cl_object_put(env, osc2cl(osc));
2633 static int osc_import_event(struct obd_device *obd,
2634 struct obd_import *imp,
2635 enum obd_import_event event)
2637 struct client_obd *cli;
2641 LASSERT(imp->imp_obd == obd);
2644 case IMP_EVENT_DISCON: {
2646 spin_lock(&cli->cl_loi_list_lock);
2647 cli->cl_avail_grant = 0;
2648 cli->cl_lost_grant = 0;
2649 spin_unlock(&cli->cl_loi_list_lock);
2652 case IMP_EVENT_INACTIVE: {
2653 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2656 case IMP_EVENT_INVALIDATE: {
2657 struct ldlm_namespace *ns = obd->obd_namespace;
2661 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2663 env = cl_env_get(&refcheck);
2665 osc_io_unplug(env, &obd->u.cli, NULL);
2667 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2668 osc_ldlm_resource_invalidate,
2670 cl_env_put(env, &refcheck);
2672 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2677 case IMP_EVENT_ACTIVE: {
2678 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2681 case IMP_EVENT_OCD: {
2682 struct obd_connect_data *ocd = &imp->imp_connect_data;
2684 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2685 osc_init_grant(&obd->u.cli, ocd);
2688 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2689 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2691 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2694 case IMP_EVENT_DEACTIVATE: {
2695 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2698 case IMP_EVENT_ACTIVATE: {
2699 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2703 CERROR("Unknown import event %d\n", event);
2710 * Determine whether the lock can be canceled before replaying the lock
2711 * during recovery, see bug16774 for detailed information.
2713 * \retval zero the lock can't be canceled
2714 * \retval other ok to cancel
2716 static int osc_cancel_weight(struct ldlm_lock *lock)
2719 * Cancel all unused and granted extent lock.
2721 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2722 lock->l_granted_mode == lock->l_req_mode &&
2723 osc_ldlm_weigh_ast(lock) == 0)
2729 static int brw_queue_work(const struct lu_env *env, void *data)
2731 struct client_obd *cli = data;
2733 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2735 osc_io_unplug(env, cli, NULL);
2739 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2741 struct client_obd *cli = &obd->u.cli;
2742 struct obd_type *type;
2750 rc = ptlrpcd_addref();
2754 rc = client_obd_setup(obd, lcfg);
2756 GOTO(out_ptlrpcd, rc);
2758 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2759 if (IS_ERR(handler))
2760 GOTO(out_client_setup, rc = PTR_ERR(handler));
2761 cli->cl_writeback_work = handler;
2763 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2764 if (IS_ERR(handler))
2765 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2766 cli->cl_lru_work = handler;
2768 rc = osc_quota_setup(obd);
2770 GOTO(out_ptlrpcd_work, rc);
2772 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2774 #ifdef CONFIG_PROC_FS
2775 obd->obd_vars = lprocfs_osc_obd_vars;
2777 /* If this is true then both client (osc) and server (osp) are on the
2778 * same node. The osp layer if loaded first will register the osc proc
2779 * directory. In that case this obd_device will be attached its proc
2780 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2781 type = class_search_type(LUSTRE_OSP_NAME);
2782 if (type && type->typ_procsym) {
2783 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2785 obd->obd_vars, obd);
2786 if (IS_ERR(obd->obd_proc_entry)) {
2787 rc = PTR_ERR(obd->obd_proc_entry);
2788 CERROR("error %d setting up lprocfs for %s\n", rc,
2790 obd->obd_proc_entry = NULL;
2793 rc = lprocfs_obd_setup(obd);
2796 /* If the basic OSC proc tree construction succeeded then
2797 * lets do the rest. */
2799 lproc_osc_attach_seqstat(obd);
2800 sptlrpc_lprocfs_cliobd_attach(obd);
2801 ptlrpc_lprocfs_register_obd(obd);
2805 * We try to control the total number of requests with a upper limit
2806 * osc_reqpool_maxreqcount. There might be some race which will cause
2807 * over-limit allocation, but it is fine.
2809 req_count = atomic_read(&osc_pool_req_count);
2810 if (req_count < osc_reqpool_maxreqcount) {
2811 adding = cli->cl_max_rpcs_in_flight + 2;
2812 if (req_count + adding > osc_reqpool_maxreqcount)
2813 adding = osc_reqpool_maxreqcount - req_count;
2815 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2816 atomic_add(added, &osc_pool_req_count);
2819 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2820 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2822 spin_lock(&osc_shrink_lock);
2823 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2824 spin_unlock(&osc_shrink_lock);
2829 if (cli->cl_writeback_work != NULL) {
2830 ptlrpcd_destroy_work(cli->cl_writeback_work);
2831 cli->cl_writeback_work = NULL;
2833 if (cli->cl_lru_work != NULL) {
2834 ptlrpcd_destroy_work(cli->cl_lru_work);
2835 cli->cl_lru_work = NULL;
2838 client_obd_cleanup(obd);
2844 static int osc_precleanup(struct obd_device *obd)
2846 struct client_obd *cli = &obd->u.cli;
2850 * for echo client, export may be on zombie list, wait for
2851 * zombie thread to cull it, because cli.cl_import will be
2852 * cleared in client_disconnect_export():
2853 * class_export_destroy() -> obd_cleanup() ->
2854 * echo_device_free() -> echo_client_cleanup() ->
2855 * obd_disconnect() -> osc_disconnect() ->
2856 * client_disconnect_export()
2858 obd_zombie_barrier();
2859 if (cli->cl_writeback_work) {
2860 ptlrpcd_destroy_work(cli->cl_writeback_work);
2861 cli->cl_writeback_work = NULL;
2864 if (cli->cl_lru_work) {
2865 ptlrpcd_destroy_work(cli->cl_lru_work);
2866 cli->cl_lru_work = NULL;
2869 obd_cleanup_client_import(obd);
2870 ptlrpc_lprocfs_unregister_obd(obd);
2871 lprocfs_obd_cleanup(obd);
2875 int osc_cleanup(struct obd_device *obd)
2877 struct client_obd *cli = &obd->u.cli;
2882 spin_lock(&osc_shrink_lock);
2883 list_del(&cli->cl_shrink_list);
2884 spin_unlock(&osc_shrink_lock);
2887 if (cli->cl_cache != NULL) {
2888 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2889 spin_lock(&cli->cl_cache->ccc_lru_lock);
2890 list_del_init(&cli->cl_lru_osc);
2891 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2892 cli->cl_lru_left = NULL;
2893 cl_cache_decref(cli->cl_cache);
2894 cli->cl_cache = NULL;
2897 /* free memory of osc quota cache */
2898 osc_quota_cleanup(obd);
2900 rc = client_obd_cleanup(obd);
2906 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2908 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2909 return rc > 0 ? 0: rc;
2912 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2914 return osc_process_config_base(obd, buf);
2917 static struct obd_ops osc_obd_ops = {
2918 .o_owner = THIS_MODULE,
2919 .o_setup = osc_setup,
2920 .o_precleanup = osc_precleanup,
2921 .o_cleanup = osc_cleanup,
2922 .o_add_conn = client_import_add_conn,
2923 .o_del_conn = client_import_del_conn,
2924 .o_connect = client_connect_import,
2925 .o_reconnect = osc_reconnect,
2926 .o_disconnect = osc_disconnect,
2927 .o_statfs = osc_statfs,
2928 .o_statfs_async = osc_statfs_async,
2929 .o_create = osc_create,
2930 .o_destroy = osc_destroy,
2931 .o_getattr = osc_getattr,
2932 .o_setattr = osc_setattr,
2933 .o_iocontrol = osc_iocontrol,
2934 .o_set_info_async = osc_set_info_async,
2935 .o_import_event = osc_import_event,
2936 .o_process_config = osc_process_config,
2937 .o_quotactl = osc_quotactl,
2940 static struct shrinker *osc_cache_shrinker;
2941 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2942 DEFINE_SPINLOCK(osc_shrink_lock);
2944 #ifndef HAVE_SHRINKER_COUNT
2945 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2947 struct shrink_control scv = {
2948 .nr_to_scan = shrink_param(sc, nr_to_scan),
2949 .gfp_mask = shrink_param(sc, gfp_mask)
2951 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2952 struct shrinker *shrinker = NULL;
2955 (void)osc_cache_shrink_scan(shrinker, &scv);
2957 return osc_cache_shrink_count(shrinker, &scv);
2961 static int __init osc_init(void)
2963 bool enable_proc = true;
2964 struct obd_type *type;
2965 unsigned int reqpool_size;
2966 unsigned int reqsize;
2968 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2969 osc_cache_shrink_count, osc_cache_shrink_scan);
2972 /* print an address of _any_ initialized kernel symbol from this
2973 * module, to allow debugging with gdb that doesn't support data
2974 * symbols from modules.*/
2975 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2977 rc = lu_kmem_init(osc_caches);
2981 type = class_search_type(LUSTRE_OSP_NAME);
2982 if (type != NULL && type->typ_procsym != NULL)
2983 enable_proc = false;
2985 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2986 LUSTRE_OSC_NAME, &osc_device_type);
2990 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2992 /* This is obviously too much memory, only prevent overflow here */
2993 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2994 GOTO(out_type, rc = -EINVAL);
2996 reqpool_size = osc_reqpool_mem_max << 20;
2999 while (reqsize < OST_IO_MAXREQSIZE)
3000 reqsize = reqsize << 1;
3003 * We don't enlarge the request count in OSC pool according to
3004 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3005 * tried after normal allocation failed. So a small OSC pool won't
3006 * cause much performance degression in most of cases.
3008 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3010 atomic_set(&osc_pool_req_count, 0);
3011 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3012 ptlrpc_add_rqs_to_pool);
3014 if (osc_rq_pool != NULL)
3018 class_unregister_type(LUSTRE_OSC_NAME);
3020 lu_kmem_fini(osc_caches);
3025 static void __exit osc_exit(void)
3027 remove_shrinker(osc_cache_shrinker);
3028 class_unregister_type(LUSTRE_OSC_NAME);
3029 lu_kmem_fini(osc_caches);
3030 ptlrpc_free_rq_pool(osc_rq_pool);
3033 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3034 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3035 MODULE_VERSION(LUSTRE_VERSION_STRING);
3036 MODULE_LICENSE("GPL");
3038 module_init(osc_init);
3039 module_exit(osc_exit);