4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_ladvise_args {
96 obd_enqueue_update_f la_upcall;
100 struct osc_enqueue_args {
101 struct obd_export *oa_exp;
102 enum ldlm_type oa_type;
103 enum ldlm_mode oa_mode;
105 osc_enqueue_upcall_f oa_upcall;
107 struct ost_lvb *oa_lvb;
108 struct lustre_handle oa_lockh;
109 unsigned int oa_agl:1;
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
118 struct ost_body *body;
120 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
123 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
129 struct ptlrpc_request *req;
130 struct ost_body *body;
134 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
138 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
140 ptlrpc_request_free(req);
144 osc_pack_req_body(req, oa);
146 ptlrpc_request_set_replen(req);
148 rc = ptlrpc_queue_wait(req);
152 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154 GOTO(out, rc = -EPROTO);
156 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
159 oa->o_blksize = cli_brw_size(exp->exp_obd);
160 oa->o_valid |= OBD_MD_FLBLKSZ;
164 ptlrpc_req_finished(req);
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
172 struct ptlrpc_request *req;
173 struct ost_body *body;
177 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
179 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
185 ptlrpc_request_free(req);
189 osc_pack_req_body(req, oa);
191 ptlrpc_request_set_replen(req);
193 rc = ptlrpc_queue_wait(req);
197 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
199 GOTO(out, rc = -EPROTO);
201 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
205 ptlrpc_req_finished(req);
210 static int osc_setattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_setattr_args *sa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 GOTO(out, rc = -EPROTO);
224 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
227 rc = sa->sa_upcall(sa->sa_cookie, rc);
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232 obd_enqueue_update_f upcall, void *cookie,
233 struct ptlrpc_request_set *rqset)
235 struct ptlrpc_request *req;
236 struct osc_setattr_args *sa;
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oa);
253 ptlrpc_request_set_replen(req);
255 /* do mds to ost setattr asynchronously */
257 /* Do not wait for response. */
258 ptlrpcd_add_req(req);
260 req->rq_interpret_reply =
261 (ptlrpc_interpterer_t)osc_setattr_interpret;
263 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264 sa = ptlrpc_req_async_args(req);
266 sa->sa_upcall = upcall;
267 sa->sa_cookie = cookie;
269 if (rqset == PTLRPCD_SET)
270 ptlrpcd_add_req(req);
272 ptlrpc_set_add_req(rqset, req);
278 static int osc_ladvise_interpret(const struct lu_env *env,
279 struct ptlrpc_request *req,
282 struct osc_ladvise_args *la = arg;
283 struct ost_body *body;
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 *la->la_oa = body->oa;
295 rc = la->la_upcall(la->la_cookie, rc);
300 * If rqset is NULL, do not wait for response. Upcall and cookie could also
301 * be NULL in this case
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304 struct ladvise_hdr *ladvise_hdr,
305 obd_enqueue_update_f upcall, void *cookie,
306 struct ptlrpc_request_set *rqset)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
310 struct osc_ladvise_args *la;
312 struct lu_ladvise *req_ladvise;
313 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
314 int num_advise = ladvise_hdr->lah_count;
315 struct ladvise_hdr *req_ladvise_hdr;
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
322 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323 num_advise * sizeof(*ladvise));
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
326 ptlrpc_request_free(req);
329 req->rq_request_portal = OST_IO_PORTAL;
330 ptlrpc_at_set_req_timeout(req);
332 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
337 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338 &RMF_OST_LADVISE_HDR);
339 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
341 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343 ptlrpc_request_set_replen(req);
346 /* Do not wait for response. */
347 ptlrpcd_add_req(req);
351 req->rq_interpret_reply = osc_ladvise_interpret;
352 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353 la = ptlrpc_req_async_args(req);
355 la->la_upcall = upcall;
356 la->la_cookie = cookie;
358 if (rqset == PTLRPCD_SET)
359 ptlrpcd_add_req(req);
361 ptlrpc_set_add_req(rqset, req);
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
369 struct ptlrpc_request *req;
370 struct ost_body *body;
375 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
380 GOTO(out, rc = -ENOMEM);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
384 ptlrpc_request_free(req);
388 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
391 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
393 ptlrpc_request_set_replen(req);
395 rc = ptlrpc_queue_wait(req);
399 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
401 GOTO(out_req, rc = -EPROTO);
403 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
406 oa->o_blksize = cli_brw_size(exp->exp_obd);
407 oa->o_valid |= OBD_MD_FLBLKSZ;
409 CDEBUG(D_HA, "transno: "LPD64"\n",
410 lustre_msg_get_transno(req->rq_repmsg));
412 ptlrpc_req_finished(req);
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418 obd_enqueue_update_f upcall, void *cookie,
419 struct ptlrpc_request_set *rqset)
421 struct ptlrpc_request *req;
422 struct osc_setattr_args *sa;
423 struct ost_body *body;
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
431 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
433 ptlrpc_request_free(req);
436 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437 ptlrpc_at_set_req_timeout(req);
439 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
443 ptlrpc_request_set_replen(req);
445 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447 sa = ptlrpc_req_async_args(req);
449 sa->sa_upcall = upcall;
450 sa->sa_cookie = cookie;
451 if (rqset == PTLRPCD_SET)
452 ptlrpcd_add_req(req);
454 ptlrpc_set_add_req(rqset, req);
459 static int osc_sync_interpret(const struct lu_env *env,
460 struct ptlrpc_request *req,
463 struct osc_fsync_args *fa = arg;
464 struct ost_body *body;
465 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
466 unsigned long valid = 0;
467 struct cl_object *obj;
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 CERROR("can't unpack ost_body\n");
476 GOTO(out, rc = -EPROTO);
479 *fa->fa_oa = body->oa;
480 obj = osc2cl(fa->fa_obj);
482 /* Update osc object's blocks attribute */
483 cl_object_attr_lock(obj);
484 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485 attr->cat_blocks = body->oa.o_blocks;
490 cl_object_attr_update(env, obj, attr, valid);
491 cl_object_attr_unlock(obj);
494 rc = fa->fa_upcall(fa->fa_cookie, rc);
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499 obd_enqueue_update_f upcall, void *cookie,
500 struct ptlrpc_request_set *rqset)
502 struct obd_export *exp = osc_export(obj);
503 struct ptlrpc_request *req;
504 struct ost_body *body;
505 struct osc_fsync_args *fa;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
513 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
515 ptlrpc_request_free(req);
519 /* overload the size and blocks fields in the oa with start/end */
520 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
522 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
524 ptlrpc_request_set_replen(req);
525 req->rq_interpret_reply = osc_sync_interpret;
527 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528 fa = ptlrpc_req_async_args(req);
531 fa->fa_upcall = upcall;
532 fa->fa_cookie = cookie;
534 if (rqset == PTLRPCD_SET)
535 ptlrpcd_add_req(req);
537 ptlrpc_set_add_req(rqset, req);
542 /* Find and cancel locally locks matched by @mode in the resource found by
543 * @objid. Found locks are added into @cancel list. Returns the amount of
544 * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546 struct list_head *cancels,
547 enum ldlm_mode mode, __u64 lock_flags)
549 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550 struct ldlm_res_id res_id;
551 struct ldlm_resource *res;
555 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556 * export) but disabled through procfs (flag in NS).
558 * This distinguishes from a case when ELC is not supported originally,
559 * when we still want to cancel locks in advance and just cancel them
560 * locally, without sending any RPC. */
561 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
564 ostid_build_res_name(&oa->o_oi, &res_id);
565 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
569 LDLM_RESOURCE_ADDREF(res);
570 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571 lock_flags, 0, NULL);
572 LDLM_RESOURCE_DELREF(res);
573 ldlm_resource_putref(res);
577 static int osc_destroy_interpret(const struct lu_env *env,
578 struct ptlrpc_request *req, void *data,
581 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
583 atomic_dec(&cli->cl_destroy_in_flight);
584 wake_up(&cli->cl_destroy_waitq);
588 static int osc_can_send_destroy(struct client_obd *cli)
590 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591 cli->cl_max_rpcs_in_flight) {
592 /* The destroy request can be sent */
595 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596 cli->cl_max_rpcs_in_flight) {
598 * The counter has been modified between the two atomic
601 wake_up(&cli->cl_destroy_waitq);
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
609 struct client_obd *cli = &exp->exp_obd->u.cli;
610 struct ptlrpc_request *req;
611 struct ost_body *body;
612 struct list_head cancels = LIST_HEAD_INIT(cancels);
617 CDEBUG(D_INFO, "oa NULL\n");
621 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622 LDLM_FL_DISCARD_DATA);
624 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
626 ldlm_lock_list_put(&cancels, l_bl_ast, count);
630 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
633 ptlrpc_request_free(req);
637 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638 ptlrpc_at_set_req_timeout(req);
640 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
642 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
644 ptlrpc_request_set_replen(req);
646 req->rq_interpret_reply = osc_destroy_interpret;
647 if (!osc_can_send_destroy(cli)) {
648 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
651 * Wait until the number of on-going destroy RPCs drops
652 * under max_rpc_in_flight
654 l_wait_event_exclusive(cli->cl_destroy_waitq,
655 osc_can_send_destroy(cli), &lwi);
658 /* Do not wait for response */
659 ptlrpcd_add_req(req);
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
666 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
668 LASSERT(!(oa->o_valid & bits));
671 spin_lock(&cli->cl_loi_list_lock);
672 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673 oa->o_dirty = cli->cl_dirty_grant;
675 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677 cli->cl_dirty_max_pages)) {
678 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679 cli->cl_dirty_pages, cli->cl_dirty_transit,
680 cli->cl_dirty_max_pages);
682 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683 atomic_long_read(&obd_dirty_transit_pages) >
684 (long)(obd_max_dirty_pages + 1))) {
685 /* The atomic_read() allowing the atomic_inc() are
686 * not covered by a lock thus they may safely race and trip
687 * this CERROR() unless we add in a small fudge factor (+1). */
688 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689 cli_name(cli), atomic_long_read(&obd_dirty_pages),
690 atomic_long_read(&obd_dirty_transit_pages),
691 obd_max_dirty_pages);
693 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
695 CERROR("dirty %lu - dirty_max %lu too big???\n",
696 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
699 unsigned long nrpages;
701 nrpages = cli->cl_max_pages_per_rpc;
702 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
709 /* take extent tax into account when asking for more
711 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
712 cli->cl_max_extent_pages;
713 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
716 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717 oa->o_dropped = cli->cl_lost_grant;
718 cli->cl_lost_grant = 0;
719 spin_unlock(&cli->cl_loi_list_lock);
720 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
724 void osc_update_next_shrink(struct client_obd *cli)
726 cli->cl_next_shrink_grant =
727 cfs_time_shift(cli->cl_grant_shrink_interval);
728 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729 cli->cl_next_shrink_grant);
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
734 spin_lock(&cli->cl_loi_list_lock);
735 cli->cl_avail_grant += grant;
736 spin_unlock(&cli->cl_loi_list_lock);
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
741 if (body->oa.o_valid & OBD_MD_FLGRANT) {
742 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743 __osc_update_grant(cli, body->oa.o_grant);
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748 u32 keylen, void *key,
749 u32 vallen, void *val,
750 struct ptlrpc_request_set *set);
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753 struct ptlrpc_request *req,
756 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758 struct ost_body *body;
761 __osc_update_grant(cli, oa->o_grant);
765 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
767 osc_update_grant(cli, body);
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
775 spin_lock(&cli->cl_loi_list_lock);
776 oa->o_grant = cli->cl_avail_grant / 4;
777 cli->cl_avail_grant -= oa->o_grant;
778 spin_unlock(&cli->cl_loi_list_lock);
779 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780 oa->o_valid |= OBD_MD_FLFLAGS;
783 oa->o_flags |= OBD_FL_SHRINK_GRANT;
784 osc_update_next_shrink(cli);
787 /* Shrink the current grant, either from some large amount to enough for a
788 * full set of in-flight RPCs, or if we have already shrunk to that limit
789 * then to enough for a single RPC. This avoids keeping more grant than
790 * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
793 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
796 spin_lock(&cli->cl_loi_list_lock);
797 if (cli->cl_avail_grant <= target_bytes)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799 spin_unlock(&cli->cl_loi_list_lock);
801 return osc_shrink_grant_to_target(cli, target_bytes);
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
807 struct ost_body *body;
810 spin_lock(&cli->cl_loi_list_lock);
811 /* Don't shrink if we are already above or below the desired limit
812 * We don't want to shrink below a single RPC, as that will negatively
813 * impact block allocation and long-term performance. */
814 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
817 if (target_bytes >= cli->cl_avail_grant) {
818 spin_unlock(&cli->cl_loi_list_lock);
821 spin_unlock(&cli->cl_loi_list_lock);
827 osc_announce_cached(cli, &body->oa, 0);
829 spin_lock(&cli->cl_loi_list_lock);
830 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831 cli->cl_avail_grant = target_bytes;
832 spin_unlock(&cli->cl_loi_list_lock);
833 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834 body->oa.o_valid |= OBD_MD_FLFLAGS;
835 body->oa.o_flags = 0;
837 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838 osc_update_next_shrink(cli);
840 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842 sizeof(*body), body, NULL);
844 __osc_update_grant(cli, body->oa.o_grant);
849 static int osc_should_shrink_grant(struct client_obd *client)
851 cfs_time_t time = cfs_time_current();
852 cfs_time_t next_shrink = client->cl_next_shrink_grant;
854 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855 OBD_CONNECT_GRANT_SHRINK) == 0)
858 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859 /* Get the current RPC size directly, instead of going via:
860 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861 * Keep comment here so that it can be found by searching. */
862 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
864 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865 client->cl_avail_grant > brw_size)
868 osc_update_next_shrink(client);
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
875 struct client_obd *client;
877 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878 if (osc_should_shrink_grant(client))
879 osc_shrink_grant(client);
884 static int osc_add_shrink_grant(struct client_obd *client)
888 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
890 osc_grant_shrink_grant_cb, NULL,
891 &client->cl_grant_shrink_list);
893 CERROR("add grant client %s error %d\n", cli_name(client), rc);
896 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897 osc_update_next_shrink(client);
901 static int osc_del_shrink_grant(struct client_obd *client)
903 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
910 * ocd_grant is the total grant amount we're expect to hold: if we've
911 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
915 * race is tolerable here: if we're evicted, but imp_state already
916 * left EVICTED state, then cl_dirty_pages must be 0 already.
918 spin_lock(&cli->cl_loi_list_lock);
919 cli->cl_avail_grant = ocd->ocd_grant;
920 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921 cli->cl_avail_grant -= cli->cl_reserved_grant;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923 cli->cl_avail_grant -= cli->cl_dirty_grant;
925 cli->cl_avail_grant -=
926 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
929 if (cli->cl_avail_grant < 0) {
930 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931 cli_name(cli), cli->cl_avail_grant,
932 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933 /* workaround for servers which do not have the patch from
935 cli->cl_avail_grant = ocd->ocd_grant;
938 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
941 /* overhead for each extent insertion */
942 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943 /* determine the appropriate chunk size used by osc_extent. */
944 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945 ocd->ocd_grant_blkbits);
946 /* determine maximum extent size, in #pages */
947 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949 if (cli->cl_max_extent_pages == 0)
950 cli->cl_max_extent_pages = 1;
952 cli->cl_grant_extent_tax = 0;
953 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
956 spin_unlock(&cli->cl_loi_list_lock);
958 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959 "chunk bits: %d cl_max_extent_pages: %d\n",
961 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962 cli->cl_max_extent_pages);
964 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965 list_empty(&cli->cl_grant_shrink_list))
966 osc_add_shrink_grant(cli);
969 /* We assume that the reason this OSC got a short read is because it read
970 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971 * via the LOV, and it _knows_ it's reading inside the file, it's just that
972 * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974 struct brw_page **pga)
979 /* skip bytes read OK */
980 while (nob_read > 0) {
981 LASSERT (page_count > 0);
983 if (pga[i]->count > nob_read) {
984 /* EOF inside this page */
985 ptr = kmap(pga[i]->pg) +
986 (pga[i]->off & ~PAGE_MASK);
987 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
994 nob_read -= pga[i]->count;
999 /* zero remaining pages */
1000 while (page_count-- > 0) {
1001 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002 memset(ptr, 0, pga[i]->count);
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009 int requested_nob, int niocount,
1010 size_t page_count, struct brw_page **pga)
1015 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016 sizeof(*remote_rcs) *
1018 if (remote_rcs == NULL) {
1019 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1023 /* return error if any niobuf was in error */
1024 for (i = 0; i < niocount; i++) {
1025 if ((int)remote_rcs[i] < 0)
1026 return(remote_rcs[i]);
1028 if (remote_rcs[i] != 0) {
1029 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030 i, remote_rcs[i], req);
1035 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037 req->rq_bulk->bd_nob_transferred, requested_nob);
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1046 if (p1->flag != p2->flag) {
1047 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1049 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1051 /* warn if we try to combine flags that we don't know to be
1052 * safe to combine */
1053 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055 "report this at https://jira.hpdd.intel.com/\n",
1056 p1->flag, p2->flag);
1061 return (p1->off + p1->count == p2->off);
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065 struct brw_page **pga, int opc,
1066 cksum_type_t cksum_type)
1070 struct cfs_crypto_hash_desc *hdesc;
1071 unsigned int bufsize;
1073 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1075 LASSERT(pg_count > 0);
1077 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078 if (IS_ERR(hdesc)) {
1079 CERROR("Unable to initialize checksum hash %s\n",
1080 cfs_crypto_hash_name(cfs_alg));
1081 return PTR_ERR(hdesc);
1084 while (nob > 0 && pg_count > 0) {
1085 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1087 /* corrupt the data before we compute the checksum, to
1088 * simulate an OST->client data error */
1089 if (i == 0 && opc == OST_READ &&
1090 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091 unsigned char *ptr = kmap(pga[i]->pg);
1092 int off = pga[i]->off & ~PAGE_MASK;
1094 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1097 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098 pga[i]->off & ~PAGE_MASK,
1100 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101 (int)(pga[i]->off & ~PAGE_MASK));
1103 nob -= pga[i]->count;
1108 bufsize = sizeof(cksum);
1109 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1111 /* For sending we only compute the wrong checksum instead
1112 * of corrupting the data so it is still correct on a redo */
1113 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121 u32 page_count, struct brw_page **pga,
1122 struct ptlrpc_request **reqp, int resend)
1124 struct ptlrpc_request *req;
1125 struct ptlrpc_bulk_desc *desc;
1126 struct ost_body *body;
1127 struct obd_ioobj *ioobj;
1128 struct niobuf_remote *niobuf;
1129 int niocount, i, requested_nob, opc, rc;
1130 struct osc_brw_async_args *aa;
1131 struct req_capsule *pill;
1132 struct brw_page *pg_prev;
1135 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136 RETURN(-ENOMEM); /* Recoverable */
1137 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138 RETURN(-EINVAL); /* Fatal */
1140 if ((cmd & OBD_BRW_WRITE) != 0) {
1142 req = ptlrpc_request_alloc_pool(cli->cl_import,
1144 &RQF_OST_BRW_WRITE);
1147 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1152 for (niocount = i = 1; i < page_count; i++) {
1153 if (!can_merge_pages(pga[i - 1], pga[i]))
1157 pill = &req->rq_pill;
1158 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1160 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161 niocount * sizeof(*niobuf));
1163 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1165 ptlrpc_request_free(req);
1168 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169 ptlrpc_at_set_req_timeout(req);
1170 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1172 req->rq_no_retry_einprogress = 1;
1174 desc = ptlrpc_prep_bulk_imp(req, page_count,
1175 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177 PTLRPC_BULK_PUT_SINK) |
1178 PTLRPC_BULK_BUF_KIOV,
1180 &ptlrpc_bulk_kiov_pin_ops);
1183 GOTO(out, rc = -ENOMEM);
1184 /* NB request now owns desc and will free it when it gets freed */
1186 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1191 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1193 obdo_to_ioobj(oa, ioobj);
1194 ioobj->ioo_bufcnt = niocount;
1195 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196 * that might be send for this request. The actual number is decided
1197 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198 * "max - 1" for old client compatibility sending "0", and also so the
1199 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201 LASSERT(page_count > 0);
1203 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204 struct brw_page *pg = pga[i];
1205 int poff = pg->off & ~PAGE_MASK;
1207 LASSERT(pg->count > 0);
1208 /* make sure there is no gap in the middle of page array */
1209 LASSERTF(page_count == 1 ||
1210 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211 ergo(i > 0 && i < page_count - 1,
1212 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1213 ergo(i == page_count - 1, poff == 0)),
1214 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215 i, page_count, pg, pg->off, pg->count);
1216 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1220 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221 pg_prev->pg, page_private(pg_prev->pg),
1222 pg_prev->pg->index, pg_prev->off);
1223 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224 (pg->flag & OBD_BRW_SRVLOCK));
1226 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227 requested_nob += pg->count;
1229 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1231 niobuf->rnb_len += pg->count;
1233 niobuf->rnb_offset = pg->off;
1234 niobuf->rnb_len = pg->count;
1235 niobuf->rnb_flags = pg->flag;
1240 LASSERTF((void *)(niobuf - niocount) ==
1241 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1245 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1247 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248 body->oa.o_valid |= OBD_MD_FLFLAGS;
1249 body->oa.o_flags = 0;
1251 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1254 if (osc_should_shrink_grant(cli))
1255 osc_shrink_grant_local(cli, &body->oa);
1257 /* size[REQ_REC_OFF] still sizeof (*body) */
1258 if (opc == OST_WRITE) {
1259 if (cli->cl_checksum &&
1260 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261 /* store cl_cksum_type in a local variable since
1262 * it can be changed via lprocfs */
1263 cksum_type_t cksum_type = cli->cl_cksum_type;
1265 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267 body->oa.o_flags = 0;
1269 body->oa.o_flags |= cksum_type_pack(cksum_type);
1270 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1275 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1277 /* save this in 'oa', too, for later checking */
1278 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279 oa->o_flags |= cksum_type_pack(cksum_type);
1281 /* clear out the checksum flag, in case this is a
1282 * resend but cl_checksum is no longer set. b=11238 */
1283 oa->o_valid &= ~OBD_MD_FLCKSUM;
1285 oa->o_cksum = body->oa.o_cksum;
1286 /* 1 RC per niobuf */
1287 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288 sizeof(__u32) * niocount);
1290 if (cli->cl_checksum &&
1291 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293 body->oa.o_flags = 0;
1294 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1298 ptlrpc_request_set_replen(req);
1300 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301 aa = ptlrpc_req_async_args(req);
1303 aa->aa_requested_nob = requested_nob;
1304 aa->aa_nio_count = niocount;
1305 aa->aa_page_count = page_count;
1309 INIT_LIST_HEAD(&aa->aa_oaps);
1312 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1319 ptlrpc_req_finished(req);
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324 __u32 client_cksum, __u32 server_cksum, int nob,
1325 size_t page_count, struct brw_page **pga,
1326 cksum_type_t client_cksum_type)
1330 cksum_type_t cksum_type;
1332 if (server_cksum == client_cksum) {
1333 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1337 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1339 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1342 if (cksum_type != client_cksum_type)
1343 msg = "the server did not use the checksum type specified in "
1344 "the original request - likely a protocol problem";
1345 else if (new_cksum == server_cksum)
1346 msg = "changed on the client after we checksummed it - "
1347 "likely false positive due to mmap IO (bug 11742)";
1348 else if (new_cksum == client_cksum)
1349 msg = "changed in transit before arrival at OST";
1351 msg = "changed in transit AND doesn't match the original - "
1352 "likely false positive due to mmap IO (bug 11742)";
1354 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356 msg, libcfs_nid2str(peer->nid),
1357 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360 POSTID(&oa->o_oi), pga[0]->off,
1361 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363 "client csum now %x\n", client_cksum, client_cksum_type,
1364 server_cksum, cksum_type, new_cksum);
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1371 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372 const lnet_process_id_t *peer =
1373 &req->rq_import->imp_connection->c_peer;
1374 struct client_obd *cli = aa->aa_cli;
1375 struct ost_body *body;
1376 u32 client_cksum = 0;
1379 if (rc < 0 && rc != -EDQUOT) {
1380 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1384 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1387 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1391 /* set/clear over quota flag for a uid/gid */
1392 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1396 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1397 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1399 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1402 osc_update_grant(cli, body);
1407 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1408 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1410 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1412 CERROR("Unexpected +ve rc %d\n", rc);
1415 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1417 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1420 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1421 check_write_checksum(&body->oa, peer, client_cksum,
1422 body->oa.o_cksum, aa->aa_requested_nob,
1423 aa->aa_page_count, aa->aa_ppga,
1424 cksum_type_unpack(aa->aa_oa->o_flags)))
1427 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1428 aa->aa_page_count, aa->aa_ppga);
1432 /* The rest of this function executes only for OST_READs */
1434 /* if unwrap_bulk failed, return -EAGAIN to retry */
1435 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1437 GOTO(out, rc = -EAGAIN);
1439 if (rc > aa->aa_requested_nob) {
1440 CERROR("Unexpected rc %d (%d requested)\n", rc,
1441 aa->aa_requested_nob);
1445 if (rc != req->rq_bulk->bd_nob_transferred) {
1446 CERROR ("Unexpected rc %d (%d transferred)\n",
1447 rc, req->rq_bulk->bd_nob_transferred);
1451 if (rc < aa->aa_requested_nob)
1452 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1454 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1455 static int cksum_counter;
1456 u32 server_cksum = body->oa.o_cksum;
1459 cksum_type_t cksum_type;
1461 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1462 body->oa.o_flags : 0);
1463 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1464 aa->aa_ppga, OST_READ,
1467 if (peer->nid != req->rq_bulk->bd_sender) {
1469 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1472 if (server_cksum != client_cksum) {
1473 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1474 "%s%s%s inode "DFID" object "DOSTID
1475 " extent ["LPU64"-"LPU64"]\n",
1476 req->rq_import->imp_obd->obd_name,
1477 libcfs_nid2str(peer->nid),
1479 body->oa.o_valid & OBD_MD_FLFID ?
1480 body->oa.o_parent_seq : (__u64)0,
1481 body->oa.o_valid & OBD_MD_FLFID ?
1482 body->oa.o_parent_oid : 0,
1483 body->oa.o_valid & OBD_MD_FLFID ?
1484 body->oa.o_parent_ver : 0,
1485 POSTID(&body->oa.o_oi),
1486 aa->aa_ppga[0]->off,
1487 aa->aa_ppga[aa->aa_page_count-1]->off +
1488 aa->aa_ppga[aa->aa_page_count-1]->count -
1490 CERROR("client %x, server %x, cksum_type %x\n",
1491 client_cksum, server_cksum, cksum_type);
1493 aa->aa_oa->o_cksum = client_cksum;
1497 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1500 } else if (unlikely(client_cksum)) {
1501 static int cksum_missed;
1504 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1505 CERROR("Checksum %u requested from %s but not sent\n",
1506 cksum_missed, libcfs_nid2str(peer->nid));
1512 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1513 aa->aa_oa, &body->oa);
1518 static int osc_brw_redo_request(struct ptlrpc_request *request,
1519 struct osc_brw_async_args *aa, int rc)
1521 struct ptlrpc_request *new_req;
1522 struct osc_brw_async_args *new_aa;
1523 struct osc_async_page *oap;
1526 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1527 "redo for recoverable error %d", rc);
1529 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1530 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1531 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1532 aa->aa_ppga, &new_req, 1);
1536 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1537 if (oap->oap_request != NULL) {
1538 LASSERTF(request == oap->oap_request,
1539 "request %p != oap_request %p\n",
1540 request, oap->oap_request);
1541 if (oap->oap_interrupted) {
1542 ptlrpc_req_finished(new_req);
1547 /* New request takes over pga and oaps from old request.
1548 * Note that copying a list_head doesn't work, need to move it... */
1550 new_req->rq_interpret_reply = request->rq_interpret_reply;
1551 new_req->rq_async_args = request->rq_async_args;
1552 new_req->rq_commit_cb = request->rq_commit_cb;
1553 /* cap resend delay to the current request timeout, this is similar to
1554 * what ptlrpc does (see after_reply()) */
1555 if (aa->aa_resends > new_req->rq_timeout)
1556 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1558 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1559 new_req->rq_generation_set = 1;
1560 new_req->rq_import_generation = request->rq_import_generation;
1562 new_aa = ptlrpc_req_async_args(new_req);
1564 INIT_LIST_HEAD(&new_aa->aa_oaps);
1565 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1566 INIT_LIST_HEAD(&new_aa->aa_exts);
1567 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1568 new_aa->aa_resends = aa->aa_resends;
1570 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1571 if (oap->oap_request) {
1572 ptlrpc_req_finished(oap->oap_request);
1573 oap->oap_request = ptlrpc_request_addref(new_req);
1577 /* XXX: This code will run into problem if we're going to support
1578 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1579 * and wait for all of them to be finished. We should inherit request
1580 * set from old request. */
1581 ptlrpcd_add_req(new_req);
1583 DEBUG_REQ(D_INFO, new_req, "new request");
1588 * ugh, we want disk allocation on the target to happen in offset order. we'll
1589 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1590 * fine for our small page arrays and doesn't require allocation. its an
1591 * insertion sort that swaps elements that are strides apart, shrinking the
1592 * stride down until its '1' and the array is sorted.
1594 static void sort_brw_pages(struct brw_page **array, int num)
1597 struct brw_page *tmp;
1601 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1606 for (i = stride ; i < num ; i++) {
1609 while (j >= stride && array[j - stride]->off > tmp->off) {
1610 array[j] = array[j - stride];
1615 } while (stride > 1);
1618 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1620 LASSERT(ppga != NULL);
1621 OBD_FREE(ppga, sizeof(*ppga) * count);
1624 static int brw_interpret(const struct lu_env *env,
1625 struct ptlrpc_request *req, void *data, int rc)
1627 struct osc_brw_async_args *aa = data;
1628 struct osc_extent *ext;
1629 struct osc_extent *tmp;
1630 struct client_obd *cli = aa->aa_cli;
1633 rc = osc_brw_fini_request(req, rc);
1634 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1635 /* When server return -EINPROGRESS, client should always retry
1636 * regardless of the number of times the bulk was resent already. */
1637 if (osc_recoverable_error(rc)) {
1638 if (req->rq_import_generation !=
1639 req->rq_import->imp_generation) {
1640 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1641 ""DOSTID", rc = %d.\n",
1642 req->rq_import->imp_obd->obd_name,
1643 POSTID(&aa->aa_oa->o_oi), rc);
1644 } else if (rc == -EINPROGRESS ||
1645 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1646 rc = osc_brw_redo_request(req, aa, rc);
1648 CERROR("%s: too many resent retries for object: "
1649 ""LPU64":"LPU64", rc = %d.\n",
1650 req->rq_import->imp_obd->obd_name,
1651 POSTID(&aa->aa_oa->o_oi), rc);
1656 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1661 struct obdo *oa = aa->aa_oa;
1662 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1663 unsigned long valid = 0;
1664 struct cl_object *obj;
1665 struct osc_async_page *last;
1667 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1668 obj = osc2cl(last->oap_obj);
1670 cl_object_attr_lock(obj);
1671 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1672 attr->cat_blocks = oa->o_blocks;
1673 valid |= CAT_BLOCKS;
1675 if (oa->o_valid & OBD_MD_FLMTIME) {
1676 attr->cat_mtime = oa->o_mtime;
1679 if (oa->o_valid & OBD_MD_FLATIME) {
1680 attr->cat_atime = oa->o_atime;
1683 if (oa->o_valid & OBD_MD_FLCTIME) {
1684 attr->cat_ctime = oa->o_ctime;
1688 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1689 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1690 loff_t last_off = last->oap_count + last->oap_obj_off +
1693 /* Change file size if this is an out of quota or
1694 * direct IO write and it extends the file size */
1695 if (loi->loi_lvb.lvb_size < last_off) {
1696 attr->cat_size = last_off;
1699 /* Extend KMS if it's not a lockless write */
1700 if (loi->loi_kms < last_off &&
1701 oap2osc_page(last)->ops_srvlock == 0) {
1702 attr->cat_kms = last_off;
1708 cl_object_attr_update(env, obj, attr, valid);
1709 cl_object_attr_unlock(obj);
1711 OBDO_FREE(aa->aa_oa);
1713 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1714 osc_inc_unstable_pages(req);
1716 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1717 list_del_init(&ext->oe_link);
1718 osc_extent_finish(env, ext, 1, rc);
1720 LASSERT(list_empty(&aa->aa_exts));
1721 LASSERT(list_empty(&aa->aa_oaps));
1723 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1724 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1726 spin_lock(&cli->cl_loi_list_lock);
1727 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1728 * is called so we know whether to go to sync BRWs or wait for more
1729 * RPCs to complete */
1730 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1731 cli->cl_w_in_flight--;
1733 cli->cl_r_in_flight--;
1734 osc_wake_cache_waiters(cli);
1735 spin_unlock(&cli->cl_loi_list_lock);
1737 osc_io_unplug(env, cli, NULL);
1741 static void brw_commit(struct ptlrpc_request *req)
1743 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1744 * this called via the rq_commit_cb, I need to ensure
1745 * osc_dec_unstable_pages is still called. Otherwise unstable
1746 * pages may be leaked. */
1747 spin_lock(&req->rq_lock);
1748 if (likely(req->rq_unstable)) {
1749 req->rq_unstable = 0;
1750 spin_unlock(&req->rq_lock);
1752 osc_dec_unstable_pages(req);
1754 req->rq_committed = 1;
1755 spin_unlock(&req->rq_lock);
1760 * Build an RPC by the list of extent @ext_list. The caller must ensure
1761 * that the total pages in this list are NOT over max pages per RPC.
1762 * Extents in the list must be in OES_RPC state.
1764 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1765 struct list_head *ext_list, int cmd)
1767 struct ptlrpc_request *req = NULL;
1768 struct osc_extent *ext;
1769 struct brw_page **pga = NULL;
1770 struct osc_brw_async_args *aa = NULL;
1771 struct obdo *oa = NULL;
1772 struct osc_async_page *oap;
1773 struct osc_object *obj = NULL;
1774 struct cl_req_attr *crattr = NULL;
1775 loff_t starting_offset = OBD_OBJECT_EOF;
1776 loff_t ending_offset = 0;
1780 bool soft_sync = false;
1781 bool interrupted = false;
1785 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1786 struct ost_body *body;
1788 LASSERT(!list_empty(ext_list));
1790 /* add pages into rpc_list to build BRW rpc */
1791 list_for_each_entry(ext, ext_list, oe_link) {
1792 LASSERT(ext->oe_state == OES_RPC);
1793 mem_tight |= ext->oe_memalloc;
1794 grant += ext->oe_grants;
1795 page_count += ext->oe_nr_pages;
1800 soft_sync = osc_over_unstable_soft_limit(cli);
1802 mpflag = cfs_memory_pressure_get_and_set();
1804 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1806 GOTO(out, rc = -ENOMEM);
1810 GOTO(out, rc = -ENOMEM);
1813 list_for_each_entry(ext, ext_list, oe_link) {
1814 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1816 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1818 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1819 pga[i] = &oap->oap_brw_page;
1820 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1823 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1824 if (starting_offset == OBD_OBJECT_EOF ||
1825 starting_offset > oap->oap_obj_off)
1826 starting_offset = oap->oap_obj_off;
1828 LASSERT(oap->oap_page_off == 0);
1829 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1830 ending_offset = oap->oap_obj_off +
1833 LASSERT(oap->oap_page_off + oap->oap_count ==
1835 if (oap->oap_interrupted)
1840 /* first page in the list */
1841 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1843 crattr = &osc_env_info(env)->oti_req_attr;
1844 memset(crattr, 0, sizeof(*crattr));
1845 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1846 crattr->cra_flags = ~0ULL;
1847 crattr->cra_page = oap2cl_page(oap);
1848 crattr->cra_oa = oa;
1849 cl_req_attr_set(env, osc2cl(obj), crattr);
1851 if (cmd == OBD_BRW_WRITE)
1852 oa->o_grant_used = grant;
1854 sort_brw_pages(pga, page_count);
1855 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1857 CERROR("prep_req failed: %d\n", rc);
1861 req->rq_commit_cb = brw_commit;
1862 req->rq_interpret_reply = brw_interpret;
1863 req->rq_memalloc = mem_tight != 0;
1864 oap->oap_request = ptlrpc_request_addref(req);
1865 if (interrupted && !req->rq_intr)
1866 ptlrpc_mark_interrupted(req);
1868 /* Need to update the timestamps after the request is built in case
1869 * we race with setattr (locally or in queue at OST). If OST gets
1870 * later setattr before earlier BRW (as determined by the request xid),
1871 * the OST will not use BRW timestamps. Sadly, there is no obvious
1872 * way to do this in a single call. bug 10150 */
1873 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1874 crattr->cra_oa = &body->oa;
1875 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1876 cl_req_attr_set(env, osc2cl(obj), crattr);
1877 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1879 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1880 aa = ptlrpc_req_async_args(req);
1881 INIT_LIST_HEAD(&aa->aa_oaps);
1882 list_splice_init(&rpc_list, &aa->aa_oaps);
1883 INIT_LIST_HEAD(&aa->aa_exts);
1884 list_splice_init(ext_list, &aa->aa_exts);
1886 spin_lock(&cli->cl_loi_list_lock);
1887 starting_offset >>= PAGE_CACHE_SHIFT;
1888 if (cmd == OBD_BRW_READ) {
1889 cli->cl_r_in_flight++;
1890 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1891 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1892 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1893 starting_offset + 1);
1895 cli->cl_w_in_flight++;
1896 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1897 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1898 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1899 starting_offset + 1);
1901 spin_unlock(&cli->cl_loi_list_lock);
1903 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1904 page_count, aa, cli->cl_r_in_flight,
1905 cli->cl_w_in_flight);
1907 ptlrpcd_add_req(req);
1913 cfs_memory_pressure_restore(mpflag);
1916 LASSERT(req == NULL);
1921 OBD_FREE(pga, sizeof(*pga) * page_count);
1922 /* this should happen rarely and is pretty bad, it makes the
1923 * pending list not follow the dirty order */
1924 while (!list_empty(ext_list)) {
1925 ext = list_entry(ext_list->next, struct osc_extent,
1927 list_del_init(&ext->oe_link);
1928 osc_extent_finish(env, ext, 0, rc);
1934 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1938 LASSERT(lock != NULL);
1940 lock_res_and_lock(lock);
1942 if (lock->l_ast_data == NULL)
1943 lock->l_ast_data = data;
1944 if (lock->l_ast_data == data)
1947 unlock_res_and_lock(lock);
1952 static int osc_enqueue_fini(struct ptlrpc_request *req,
1953 osc_enqueue_upcall_f upcall, void *cookie,
1954 struct lustre_handle *lockh, enum ldlm_mode mode,
1955 __u64 *flags, int agl, int errcode)
1957 bool intent = *flags & LDLM_FL_HAS_INTENT;
1961 /* The request was created before ldlm_cli_enqueue call. */
1962 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1963 struct ldlm_reply *rep;
1965 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1966 LASSERT(rep != NULL);
1968 rep->lock_policy_res1 =
1969 ptlrpc_status_ntoh(rep->lock_policy_res1);
1970 if (rep->lock_policy_res1)
1971 errcode = rep->lock_policy_res1;
1973 *flags |= LDLM_FL_LVB_READY;
1974 } else if (errcode == ELDLM_OK) {
1975 *flags |= LDLM_FL_LVB_READY;
1978 /* Call the update callback. */
1979 rc = (*upcall)(cookie, lockh, errcode);
1981 /* release the reference taken in ldlm_cli_enqueue() */
1982 if (errcode == ELDLM_LOCK_MATCHED)
1984 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1985 ldlm_lock_decref(lockh, mode);
1990 static int osc_enqueue_interpret(const struct lu_env *env,
1991 struct ptlrpc_request *req,
1992 struct osc_enqueue_args *aa, int rc)
1994 struct ldlm_lock *lock;
1995 struct lustre_handle *lockh = &aa->oa_lockh;
1996 enum ldlm_mode mode = aa->oa_mode;
1997 struct ost_lvb *lvb = aa->oa_lvb;
1998 __u32 lvb_len = sizeof(*lvb);
2003 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2005 lock = ldlm_handle2lock(lockh);
2006 LASSERTF(lock != NULL,
2007 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2008 lockh->cookie, req, aa);
2010 /* Take an additional reference so that a blocking AST that
2011 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2012 * to arrive after an upcall has been executed by
2013 * osc_enqueue_fini(). */
2014 ldlm_lock_addref(lockh, mode);
2016 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2017 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2019 /* Let CP AST to grant the lock first. */
2020 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2023 LASSERT(aa->oa_lvb == NULL);
2024 LASSERT(aa->oa_flags == NULL);
2025 aa->oa_flags = &flags;
2028 /* Complete obtaining the lock procedure. */
2029 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2030 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2032 /* Complete osc stuff. */
2033 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2034 aa->oa_flags, aa->oa_agl, rc);
2036 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2038 ldlm_lock_decref(lockh, mode);
2039 LDLM_LOCK_PUT(lock);
2043 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2045 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2046 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2047 * other synchronous requests, however keeping some locks and trying to obtain
2048 * others may take a considerable amount of time in a case of ost failure; and
2049 * when other sync requests do not get released lock from a client, the client
2050 * is evicted from the cluster -- such scenarious make the life difficult, so
2051 * release locks just after they are obtained. */
2052 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2053 __u64 *flags, union ldlm_policy_data *policy,
2054 struct ost_lvb *lvb, int kms_valid,
2055 osc_enqueue_upcall_f upcall, void *cookie,
2056 struct ldlm_enqueue_info *einfo,
2057 struct ptlrpc_request_set *rqset, int async, int agl)
2059 struct obd_device *obd = exp->exp_obd;
2060 struct lustre_handle lockh = { 0 };
2061 struct ptlrpc_request *req = NULL;
2062 int intent = *flags & LDLM_FL_HAS_INTENT;
2063 __u64 match_flags = *flags;
2064 enum ldlm_mode mode;
2068 /* Filesystem lock extents are extended to page boundaries so that
2069 * dealing with the page cache is a little smoother. */
2070 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2071 policy->l_extent.end |= ~PAGE_MASK;
2074 * kms is not valid when either object is completely fresh (so that no
2075 * locks are cached), or object was evicted. In the latter case cached
2076 * lock cannot be used, because it would prime inode state with
2077 * potentially stale LVB.
2082 /* Next, search for already existing extent locks that will cover us */
2083 /* If we're trying to read, we also search for an existing PW lock. The
2084 * VFS and page cache already protect us locally, so lots of readers/
2085 * writers can share a single PW lock.
2087 * There are problems with conversion deadlocks, so instead of
2088 * converting a read lock to a write lock, we'll just enqueue a new
2091 * At some point we should cancel the read lock instead of making them
2092 * send us a blocking callback, but there are problems with canceling
2093 * locks out from other users right now, too. */
2094 mode = einfo->ei_mode;
2095 if (einfo->ei_mode == LCK_PR)
2098 match_flags |= LDLM_FL_LVB_READY;
2100 match_flags |= LDLM_FL_BLOCK_GRANTED;
2101 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2102 einfo->ei_type, policy, mode, &lockh, 0);
2104 struct ldlm_lock *matched;
2106 if (*flags & LDLM_FL_TEST_LOCK)
2109 matched = ldlm_handle2lock(&lockh);
2111 /* AGL enqueues DLM locks speculatively. Therefore if
2112 * it already exists a DLM lock, it wll just inform the
2113 * caller to cancel the AGL process for this stripe. */
2114 ldlm_lock_decref(&lockh, mode);
2115 LDLM_LOCK_PUT(matched);
2117 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2118 *flags |= LDLM_FL_LVB_READY;
2120 /* We already have a lock, and it's referenced. */
2121 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2123 ldlm_lock_decref(&lockh, mode);
2124 LDLM_LOCK_PUT(matched);
2127 ldlm_lock_decref(&lockh, mode);
2128 LDLM_LOCK_PUT(matched);
2133 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2137 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2138 &RQF_LDLM_ENQUEUE_LVB);
2142 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2144 ptlrpc_request_free(req);
2148 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2150 ptlrpc_request_set_replen(req);
2153 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2154 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2156 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2157 sizeof(*lvb), LVB_T_OST, &lockh, async);
2160 struct osc_enqueue_args *aa;
2161 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2162 aa = ptlrpc_req_async_args(req);
2164 aa->oa_mode = einfo->ei_mode;
2165 aa->oa_type = einfo->ei_type;
2166 lustre_handle_copy(&aa->oa_lockh, &lockh);
2167 aa->oa_upcall = upcall;
2168 aa->oa_cookie = cookie;
2171 aa->oa_flags = flags;
2174 /* AGL is essentially to enqueue an DLM lock
2175 * in advance, so we don't care about the
2176 * result of AGL enqueue. */
2178 aa->oa_flags = NULL;
2181 req->rq_interpret_reply =
2182 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2183 if (rqset == PTLRPCD_SET)
2184 ptlrpcd_add_req(req);
2186 ptlrpc_set_add_req(rqset, req);
2187 } else if (intent) {
2188 ptlrpc_req_finished(req);
2193 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2196 ptlrpc_req_finished(req);
2201 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2202 enum ldlm_type type, union ldlm_policy_data *policy,
2203 enum ldlm_mode mode, __u64 *flags, void *data,
2204 struct lustre_handle *lockh, int unref)
2206 struct obd_device *obd = exp->exp_obd;
2207 __u64 lflags = *flags;
2211 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2214 /* Filesystem lock extents are extended to page boundaries so that
2215 * dealing with the page cache is a little smoother */
2216 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2217 policy->l_extent.end |= ~PAGE_MASK;
2219 /* Next, search for already existing extent locks that will cover us */
2220 /* If we're trying to read, we also search for an existing PW lock. The
2221 * VFS and page cache already protect us locally, so lots of readers/
2222 * writers can share a single PW lock. */
2226 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2227 res_id, type, policy, rc, lockh, unref);
2228 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2232 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2234 LASSERT(lock != NULL);
2235 if (!osc_set_lock_data(lock, data)) {
2236 ldlm_lock_decref(lockh, rc);
2239 LDLM_LOCK_PUT(lock);
2244 static int osc_statfs_interpret(const struct lu_env *env,
2245 struct ptlrpc_request *req,
2246 struct osc_async_args *aa, int rc)
2248 struct obd_statfs *msfs;
2252 /* The request has in fact never been sent
2253 * due to issues at a higher level (LOV).
2254 * Exit immediately since the caller is
2255 * aware of the problem and takes care
2256 * of the clean up */
2259 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2260 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2266 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2268 GOTO(out, rc = -EPROTO);
2271 *aa->aa_oi->oi_osfs = *msfs;
2273 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2277 static int osc_statfs_async(struct obd_export *exp,
2278 struct obd_info *oinfo, __u64 max_age,
2279 struct ptlrpc_request_set *rqset)
2281 struct obd_device *obd = class_exp2obd(exp);
2282 struct ptlrpc_request *req;
2283 struct osc_async_args *aa;
2287 /* We could possibly pass max_age in the request (as an absolute
2288 * timestamp or a "seconds.usec ago") so the target can avoid doing
2289 * extra calls into the filesystem if that isn't necessary (e.g.
2290 * during mount that would help a bit). Having relative timestamps
2291 * is not so great if request processing is slow, while absolute
2292 * timestamps are not ideal because they need time synchronization. */
2293 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2297 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2299 ptlrpc_request_free(req);
2302 ptlrpc_request_set_replen(req);
2303 req->rq_request_portal = OST_CREATE_PORTAL;
2304 ptlrpc_at_set_req_timeout(req);
2306 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2307 /* procfs requests not want stat in wait for avoid deadlock */
2308 req->rq_no_resend = 1;
2309 req->rq_no_delay = 1;
2312 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2313 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2314 aa = ptlrpc_req_async_args(req);
2317 ptlrpc_set_add_req(rqset, req);
2321 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2322 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2324 struct obd_device *obd = class_exp2obd(exp);
2325 struct obd_statfs *msfs;
2326 struct ptlrpc_request *req;
2327 struct obd_import *imp = NULL;
2331 /*Since the request might also come from lprocfs, so we need
2332 *sync this with client_disconnect_export Bug15684*/
2333 down_read(&obd->u.cli.cl_sem);
2334 if (obd->u.cli.cl_import)
2335 imp = class_import_get(obd->u.cli.cl_import);
2336 up_read(&obd->u.cli.cl_sem);
2340 /* We could possibly pass max_age in the request (as an absolute
2341 * timestamp or a "seconds.usec ago") so the target can avoid doing
2342 * extra calls into the filesystem if that isn't necessary (e.g.
2343 * during mount that would help a bit). Having relative timestamps
2344 * is not so great if request processing is slow, while absolute
2345 * timestamps are not ideal because they need time synchronization. */
2346 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2348 class_import_put(imp);
2353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2355 ptlrpc_request_free(req);
2358 ptlrpc_request_set_replen(req);
2359 req->rq_request_portal = OST_CREATE_PORTAL;
2360 ptlrpc_at_set_req_timeout(req);
2362 if (flags & OBD_STATFS_NODELAY) {
2363 /* procfs requests not want stat in wait for avoid deadlock */
2364 req->rq_no_resend = 1;
2365 req->rq_no_delay = 1;
2368 rc = ptlrpc_queue_wait(req);
2372 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2374 GOTO(out, rc = -EPROTO);
2381 ptlrpc_req_finished(req);
2385 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2386 void *karg, void __user *uarg)
2388 struct obd_device *obd = exp->exp_obd;
2389 struct obd_ioctl_data *data = karg;
2393 if (!try_module_get(THIS_MODULE)) {
2394 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2395 module_name(THIS_MODULE));
2399 case OBD_IOC_CLIENT_RECOVER:
2400 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2401 data->ioc_inlbuf1, 0);
2405 case IOC_OSC_SET_ACTIVE:
2406 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2409 case OBD_IOC_PING_TARGET:
2410 err = ptlrpc_obd_ping(obd);
2413 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2414 cmd, current_comm());
2415 GOTO(out, err = -ENOTTY);
2418 module_put(THIS_MODULE);
2422 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2423 u32 keylen, void *key,
2424 u32 vallen, void *val,
2425 struct ptlrpc_request_set *set)
2427 struct ptlrpc_request *req;
2428 struct obd_device *obd = exp->exp_obd;
2429 struct obd_import *imp = class_exp2cliimp(exp);
2434 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2436 if (KEY_IS(KEY_CHECKSUM)) {
2437 if (vallen != sizeof(int))
2439 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2443 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2444 sptlrpc_conf_client_adapt(obd);
2448 if (KEY_IS(KEY_FLUSH_CTX)) {
2449 sptlrpc_import_flush_my_ctx(imp);
2453 if (KEY_IS(KEY_CACHE_SET)) {
2454 struct client_obd *cli = &obd->u.cli;
2456 LASSERT(cli->cl_cache == NULL); /* only once */
2457 cli->cl_cache = (struct cl_client_cache *)val;
2458 cl_cache_incref(cli->cl_cache);
2459 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2461 /* add this osc into entity list */
2462 LASSERT(list_empty(&cli->cl_lru_osc));
2463 spin_lock(&cli->cl_cache->ccc_lru_lock);
2464 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2465 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2470 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2471 struct client_obd *cli = &obd->u.cli;
2472 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2473 long target = *(long *)val;
2475 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2480 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2483 /* We pass all other commands directly to OST. Since nobody calls osc
2484 methods directly and everybody is supposed to go through LOV, we
2485 assume lov checked invalid values for us.
2486 The only recognised values so far are evict_by_nid and mds_conn.
2487 Even if something bad goes through, we'd get a -EINVAL from OST
2490 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2491 &RQF_OST_SET_GRANT_INFO :
2496 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2497 RCL_CLIENT, keylen);
2498 if (!KEY_IS(KEY_GRANT_SHRINK))
2499 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2500 RCL_CLIENT, vallen);
2501 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2503 ptlrpc_request_free(req);
2507 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2508 memcpy(tmp, key, keylen);
2509 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2512 memcpy(tmp, val, vallen);
2514 if (KEY_IS(KEY_GRANT_SHRINK)) {
2515 struct osc_grant_args *aa;
2518 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2519 aa = ptlrpc_req_async_args(req);
2522 ptlrpc_req_finished(req);
2525 *oa = ((struct ost_body *)val)->oa;
2527 req->rq_interpret_reply = osc_shrink_grant_interpret;
2530 ptlrpc_request_set_replen(req);
2531 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2532 LASSERT(set != NULL);
2533 ptlrpc_set_add_req(set, req);
2534 ptlrpc_check_set(NULL, set);
2536 ptlrpcd_add_req(req);
2542 static int osc_reconnect(const struct lu_env *env,
2543 struct obd_export *exp, struct obd_device *obd,
2544 struct obd_uuid *cluuid,
2545 struct obd_connect_data *data,
2548 struct client_obd *cli = &obd->u.cli;
2550 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2554 spin_lock(&cli->cl_loi_list_lock);
2555 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2556 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2557 grant += cli->cl_dirty_grant;
2559 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2560 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2561 lost_grant = cli->cl_lost_grant;
2562 cli->cl_lost_grant = 0;
2563 spin_unlock(&cli->cl_loi_list_lock);
2565 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2566 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2567 data->ocd_version, data->ocd_grant, lost_grant);
2573 static int osc_disconnect(struct obd_export *exp)
2575 struct obd_device *obd = class_exp2obd(exp);
2578 rc = client_disconnect_export(exp);
2580 * Initially we put del_shrink_grant before disconnect_export, but it
2581 * causes the following problem if setup (connect) and cleanup
2582 * (disconnect) are tangled together.
2583 * connect p1 disconnect p2
2584 * ptlrpc_connect_import
2585 * ............... class_manual_cleanup
2588 * ptlrpc_connect_interrupt
2590 * add this client to shrink list
2592 * Bang! pinger trigger the shrink.
2593 * So the osc should be disconnected from the shrink list, after we
2594 * are sure the import has been destroyed. BUG18662
2596 if (obd->u.cli.cl_import == NULL)
2597 osc_del_shrink_grant(&obd->u.cli);
2601 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2602 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2604 struct lu_env *env = arg;
2605 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2606 struct ldlm_lock *lock;
2607 struct osc_object *osc = NULL;
2611 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2612 if (lock->l_ast_data != NULL && osc == NULL) {
2613 osc = lock->l_ast_data;
2614 cl_object_get(osc2cl(osc));
2617 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2618 * by the 2nd round of ldlm_namespace_clean() call in
2619 * osc_import_event(). */
2620 ldlm_clear_cleaned(lock);
2625 osc_object_invalidate(env, osc);
2626 cl_object_put(env, osc2cl(osc));
2632 static int osc_import_event(struct obd_device *obd,
2633 struct obd_import *imp,
2634 enum obd_import_event event)
2636 struct client_obd *cli;
2640 LASSERT(imp->imp_obd == obd);
2643 case IMP_EVENT_DISCON: {
2645 spin_lock(&cli->cl_loi_list_lock);
2646 cli->cl_avail_grant = 0;
2647 cli->cl_lost_grant = 0;
2648 spin_unlock(&cli->cl_loi_list_lock);
2651 case IMP_EVENT_INACTIVE: {
2652 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2655 case IMP_EVENT_INVALIDATE: {
2656 struct ldlm_namespace *ns = obd->obd_namespace;
2660 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2662 env = cl_env_get(&refcheck);
2664 osc_io_unplug(env, &obd->u.cli, NULL);
2666 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2667 osc_ldlm_resource_invalidate,
2669 cl_env_put(env, &refcheck);
2671 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2676 case IMP_EVENT_ACTIVE: {
2677 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2680 case IMP_EVENT_OCD: {
2681 struct obd_connect_data *ocd = &imp->imp_connect_data;
2683 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2684 osc_init_grant(&obd->u.cli, ocd);
2687 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2688 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2690 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2693 case IMP_EVENT_DEACTIVATE: {
2694 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2697 case IMP_EVENT_ACTIVATE: {
2698 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2702 CERROR("Unknown import event %d\n", event);
2709 * Determine whether the lock can be canceled before replaying the lock
2710 * during recovery, see bug16774 for detailed information.
2712 * \retval zero the lock can't be canceled
2713 * \retval other ok to cancel
2715 static int osc_cancel_weight(struct ldlm_lock *lock)
2718 * Cancel all unused and granted extent lock.
2720 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2721 lock->l_granted_mode == lock->l_req_mode &&
2722 osc_ldlm_weigh_ast(lock) == 0)
2728 static int brw_queue_work(const struct lu_env *env, void *data)
2730 struct client_obd *cli = data;
2732 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2734 osc_io_unplug(env, cli, NULL);
2738 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2740 struct client_obd *cli = &obd->u.cli;
2741 struct obd_type *type;
2749 rc = ptlrpcd_addref();
2753 rc = client_obd_setup(obd, lcfg);
2755 GOTO(out_ptlrpcd, rc);
2757 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2758 if (IS_ERR(handler))
2759 GOTO(out_client_setup, rc = PTR_ERR(handler));
2760 cli->cl_writeback_work = handler;
2762 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2763 if (IS_ERR(handler))
2764 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2765 cli->cl_lru_work = handler;
2767 rc = osc_quota_setup(obd);
2769 GOTO(out_ptlrpcd_work, rc);
2771 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2773 #ifdef CONFIG_PROC_FS
2774 obd->obd_vars = lprocfs_osc_obd_vars;
2776 /* If this is true then both client (osc) and server (osp) are on the
2777 * same node. The osp layer if loaded first will register the osc proc
2778 * directory. In that case this obd_device will be attached its proc
2779 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2780 type = class_search_type(LUSTRE_OSP_NAME);
2781 if (type && type->typ_procsym) {
2782 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2784 obd->obd_vars, obd);
2785 if (IS_ERR(obd->obd_proc_entry)) {
2786 rc = PTR_ERR(obd->obd_proc_entry);
2787 CERROR("error %d setting up lprocfs for %s\n", rc,
2789 obd->obd_proc_entry = NULL;
2792 rc = lprocfs_obd_setup(obd);
2795 /* If the basic OSC proc tree construction succeeded then
2796 * lets do the rest. */
2798 lproc_osc_attach_seqstat(obd);
2799 sptlrpc_lprocfs_cliobd_attach(obd);
2800 ptlrpc_lprocfs_register_obd(obd);
2804 * We try to control the total number of requests with a upper limit
2805 * osc_reqpool_maxreqcount. There might be some race which will cause
2806 * over-limit allocation, but it is fine.
2808 req_count = atomic_read(&osc_pool_req_count);
2809 if (req_count < osc_reqpool_maxreqcount) {
2810 adding = cli->cl_max_rpcs_in_flight + 2;
2811 if (req_count + adding > osc_reqpool_maxreqcount)
2812 adding = osc_reqpool_maxreqcount - req_count;
2814 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2815 atomic_add(added, &osc_pool_req_count);
2818 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2819 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2821 spin_lock(&osc_shrink_lock);
2822 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2823 spin_unlock(&osc_shrink_lock);
2828 if (cli->cl_writeback_work != NULL) {
2829 ptlrpcd_destroy_work(cli->cl_writeback_work);
2830 cli->cl_writeback_work = NULL;
2832 if (cli->cl_lru_work != NULL) {
2833 ptlrpcd_destroy_work(cli->cl_lru_work);
2834 cli->cl_lru_work = NULL;
2837 client_obd_cleanup(obd);
2843 static int osc_precleanup(struct obd_device *obd)
2845 struct client_obd *cli = &obd->u.cli;
2849 * for echo client, export may be on zombie list, wait for
2850 * zombie thread to cull it, because cli.cl_import will be
2851 * cleared in client_disconnect_export():
2852 * class_export_destroy() -> obd_cleanup() ->
2853 * echo_device_free() -> echo_client_cleanup() ->
2854 * obd_disconnect() -> osc_disconnect() ->
2855 * client_disconnect_export()
2857 obd_zombie_barrier();
2858 if (cli->cl_writeback_work) {
2859 ptlrpcd_destroy_work(cli->cl_writeback_work);
2860 cli->cl_writeback_work = NULL;
2863 if (cli->cl_lru_work) {
2864 ptlrpcd_destroy_work(cli->cl_lru_work);
2865 cli->cl_lru_work = NULL;
2868 obd_cleanup_client_import(obd);
2869 ptlrpc_lprocfs_unregister_obd(obd);
2870 lprocfs_obd_cleanup(obd);
2874 int osc_cleanup(struct obd_device *obd)
2876 struct client_obd *cli = &obd->u.cli;
2881 spin_lock(&osc_shrink_lock);
2882 list_del(&cli->cl_shrink_list);
2883 spin_unlock(&osc_shrink_lock);
2886 if (cli->cl_cache != NULL) {
2887 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2888 spin_lock(&cli->cl_cache->ccc_lru_lock);
2889 list_del_init(&cli->cl_lru_osc);
2890 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2891 cli->cl_lru_left = NULL;
2892 cl_cache_decref(cli->cl_cache);
2893 cli->cl_cache = NULL;
2896 /* free memory of osc quota cache */
2897 osc_quota_cleanup(obd);
2899 rc = client_obd_cleanup(obd);
2905 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2907 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2908 return rc > 0 ? 0: rc;
2911 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2913 return osc_process_config_base(obd, buf);
2916 static struct obd_ops osc_obd_ops = {
2917 .o_owner = THIS_MODULE,
2918 .o_setup = osc_setup,
2919 .o_precleanup = osc_precleanup,
2920 .o_cleanup = osc_cleanup,
2921 .o_add_conn = client_import_add_conn,
2922 .o_del_conn = client_import_del_conn,
2923 .o_connect = client_connect_import,
2924 .o_reconnect = osc_reconnect,
2925 .o_disconnect = osc_disconnect,
2926 .o_statfs = osc_statfs,
2927 .o_statfs_async = osc_statfs_async,
2928 .o_create = osc_create,
2929 .o_destroy = osc_destroy,
2930 .o_getattr = osc_getattr,
2931 .o_setattr = osc_setattr,
2932 .o_iocontrol = osc_iocontrol,
2933 .o_set_info_async = osc_set_info_async,
2934 .o_import_event = osc_import_event,
2935 .o_process_config = osc_process_config,
2936 .o_quotactl = osc_quotactl,
2939 static struct shrinker *osc_cache_shrinker;
2940 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2941 DEFINE_SPINLOCK(osc_shrink_lock);
2943 #ifndef HAVE_SHRINKER_COUNT
2944 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2946 struct shrink_control scv = {
2947 .nr_to_scan = shrink_param(sc, nr_to_scan),
2948 .gfp_mask = shrink_param(sc, gfp_mask)
2950 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2951 struct shrinker *shrinker = NULL;
2954 (void)osc_cache_shrink_scan(shrinker, &scv);
2956 return osc_cache_shrink_count(shrinker, &scv);
2960 static int __init osc_init(void)
2962 bool enable_proc = true;
2963 struct obd_type *type;
2964 unsigned int reqpool_size;
2965 unsigned int reqsize;
2967 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2968 osc_cache_shrink_count, osc_cache_shrink_scan);
2971 /* print an address of _any_ initialized kernel symbol from this
2972 * module, to allow debugging with gdb that doesn't support data
2973 * symbols from modules.*/
2974 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2976 rc = lu_kmem_init(osc_caches);
2980 type = class_search_type(LUSTRE_OSP_NAME);
2981 if (type != NULL && type->typ_procsym != NULL)
2982 enable_proc = false;
2984 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2985 LUSTRE_OSC_NAME, &osc_device_type);
2989 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2991 /* This is obviously too much memory, only prevent overflow here */
2992 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2993 GOTO(out_type, rc = -EINVAL);
2995 reqpool_size = osc_reqpool_mem_max << 20;
2998 while (reqsize < OST_IO_MAXREQSIZE)
2999 reqsize = reqsize << 1;
3002 * We don't enlarge the request count in OSC pool according to
3003 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3004 * tried after normal allocation failed. So a small OSC pool won't
3005 * cause much performance degression in most of cases.
3007 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3009 atomic_set(&osc_pool_req_count, 0);
3010 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3011 ptlrpc_add_rqs_to_pool);
3013 if (osc_rq_pool != NULL)
3017 class_unregister_type(LUSTRE_OSC_NAME);
3019 lu_kmem_fini(osc_caches);
3024 static void __exit osc_exit(void)
3026 remove_shrinker(osc_cache_shrinker);
3027 class_unregister_type(LUSTRE_OSC_NAME);
3028 lu_kmem_fini(osc_caches);
3029 ptlrpc_free_rq_pool(osc_rq_pool);
3032 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3033 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3034 MODULE_VERSION(LUSTRE_VERSION_STRING);
3035 MODULE_LICENSE("GPL");
3037 module_init(osc_init);
3038 module_exit(osc_exit);