4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_ladvise_args {
96 obd_enqueue_update_f la_upcall;
100 struct osc_enqueue_args {
101 struct obd_export *oa_exp;
102 enum ldlm_type oa_type;
103 enum ldlm_mode oa_mode;
105 osc_enqueue_upcall_f oa_upcall;
107 struct ost_lvb *oa_lvb;
108 struct lustre_handle oa_lockh;
109 unsigned int oa_agl:1;
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
118 struct ost_body *body;
120 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
123 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
129 struct ptlrpc_request *req;
130 struct ost_body *body;
134 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
138 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
140 ptlrpc_request_free(req);
144 osc_pack_req_body(req, oa);
146 ptlrpc_request_set_replen(req);
148 rc = ptlrpc_queue_wait(req);
152 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154 GOTO(out, rc = -EPROTO);
156 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
159 oa->o_blksize = cli_brw_size(exp->exp_obd);
160 oa->o_valid |= OBD_MD_FLBLKSZ;
164 ptlrpc_req_finished(req);
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
172 struct ptlrpc_request *req;
173 struct ost_body *body;
177 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
179 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
185 ptlrpc_request_free(req);
189 osc_pack_req_body(req, oa);
191 ptlrpc_request_set_replen(req);
193 rc = ptlrpc_queue_wait(req);
197 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
199 GOTO(out, rc = -EPROTO);
201 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
205 ptlrpc_req_finished(req);
210 static int osc_setattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_setattr_args *sa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 GOTO(out, rc = -EPROTO);
224 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
227 rc = sa->sa_upcall(sa->sa_cookie, rc);
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232 obd_enqueue_update_f upcall, void *cookie,
233 struct ptlrpc_request_set *rqset)
235 struct ptlrpc_request *req;
236 struct osc_setattr_args *sa;
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oa);
253 ptlrpc_request_set_replen(req);
255 /* do mds to ost setattr asynchronously */
257 /* Do not wait for response. */
258 ptlrpcd_add_req(req);
260 req->rq_interpret_reply =
261 (ptlrpc_interpterer_t)osc_setattr_interpret;
263 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264 sa = ptlrpc_req_async_args(req);
266 sa->sa_upcall = upcall;
267 sa->sa_cookie = cookie;
269 if (rqset == PTLRPCD_SET)
270 ptlrpcd_add_req(req);
272 ptlrpc_set_add_req(rqset, req);
278 static int osc_ladvise_interpret(const struct lu_env *env,
279 struct ptlrpc_request *req,
282 struct osc_ladvise_args *la = arg;
283 struct ost_body *body;
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 *la->la_oa = body->oa;
295 rc = la->la_upcall(la->la_cookie, rc);
300 * If rqset is NULL, do not wait for response. Upcall and cookie could also
301 * be NULL in this case
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304 struct ladvise_hdr *ladvise_hdr,
305 obd_enqueue_update_f upcall, void *cookie,
306 struct ptlrpc_request_set *rqset)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
310 struct osc_ladvise_args *la;
312 struct lu_ladvise *req_ladvise;
313 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
314 int num_advise = ladvise_hdr->lah_count;
315 struct ladvise_hdr *req_ladvise_hdr;
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
322 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323 num_advise * sizeof(*ladvise));
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
326 ptlrpc_request_free(req);
329 req->rq_request_portal = OST_IO_PORTAL;
330 ptlrpc_at_set_req_timeout(req);
332 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
337 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338 &RMF_OST_LADVISE_HDR);
339 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
341 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343 ptlrpc_request_set_replen(req);
346 /* Do not wait for response. */
347 ptlrpcd_add_req(req);
351 req->rq_interpret_reply = osc_ladvise_interpret;
352 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353 la = ptlrpc_req_async_args(req);
355 la->la_upcall = upcall;
356 la->la_cookie = cookie;
358 if (rqset == PTLRPCD_SET)
359 ptlrpcd_add_req(req);
361 ptlrpc_set_add_req(rqset, req);
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
369 struct ptlrpc_request *req;
370 struct ost_body *body;
375 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
380 GOTO(out, rc = -ENOMEM);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
384 ptlrpc_request_free(req);
388 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
391 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
393 ptlrpc_request_set_replen(req);
395 rc = ptlrpc_queue_wait(req);
399 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
401 GOTO(out_req, rc = -EPROTO);
403 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
406 oa->o_blksize = cli_brw_size(exp->exp_obd);
407 oa->o_valid |= OBD_MD_FLBLKSZ;
409 CDEBUG(D_HA, "transno: %lld\n",
410 lustre_msg_get_transno(req->rq_repmsg));
412 ptlrpc_req_finished(req);
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418 obd_enqueue_update_f upcall, void *cookie,
419 struct ptlrpc_request_set *rqset)
421 struct ptlrpc_request *req;
422 struct osc_setattr_args *sa;
423 struct ost_body *body;
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
431 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
433 ptlrpc_request_free(req);
436 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437 ptlrpc_at_set_req_timeout(req);
439 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
443 ptlrpc_request_set_replen(req);
445 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447 sa = ptlrpc_req_async_args(req);
449 sa->sa_upcall = upcall;
450 sa->sa_cookie = cookie;
451 if (rqset == PTLRPCD_SET)
452 ptlrpcd_add_req(req);
454 ptlrpc_set_add_req(rqset, req);
459 static int osc_sync_interpret(const struct lu_env *env,
460 struct ptlrpc_request *req,
463 struct osc_fsync_args *fa = arg;
464 struct ost_body *body;
465 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
466 unsigned long valid = 0;
467 struct cl_object *obj;
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 CERROR("can't unpack ost_body\n");
476 GOTO(out, rc = -EPROTO);
479 *fa->fa_oa = body->oa;
480 obj = osc2cl(fa->fa_obj);
482 /* Update osc object's blocks attribute */
483 cl_object_attr_lock(obj);
484 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485 attr->cat_blocks = body->oa.o_blocks;
490 cl_object_attr_update(env, obj, attr, valid);
491 cl_object_attr_unlock(obj);
494 rc = fa->fa_upcall(fa->fa_cookie, rc);
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499 obd_enqueue_update_f upcall, void *cookie,
500 struct ptlrpc_request_set *rqset)
502 struct obd_export *exp = osc_export(obj);
503 struct ptlrpc_request *req;
504 struct ost_body *body;
505 struct osc_fsync_args *fa;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
513 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
515 ptlrpc_request_free(req);
519 /* overload the size and blocks fields in the oa with start/end */
520 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
522 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
524 ptlrpc_request_set_replen(req);
525 req->rq_interpret_reply = osc_sync_interpret;
527 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528 fa = ptlrpc_req_async_args(req);
531 fa->fa_upcall = upcall;
532 fa->fa_cookie = cookie;
534 if (rqset == PTLRPCD_SET)
535 ptlrpcd_add_req(req);
537 ptlrpc_set_add_req(rqset, req);
542 /* Find and cancel locally locks matched by @mode in the resource found by
543 * @objid. Found locks are added into @cancel list. Returns the amount of
544 * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546 struct list_head *cancels,
547 enum ldlm_mode mode, __u64 lock_flags)
549 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550 struct ldlm_res_id res_id;
551 struct ldlm_resource *res;
555 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556 * export) but disabled through procfs (flag in NS).
558 * This distinguishes from a case when ELC is not supported originally,
559 * when we still want to cancel locks in advance and just cancel them
560 * locally, without sending any RPC. */
561 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
564 ostid_build_res_name(&oa->o_oi, &res_id);
565 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
569 LDLM_RESOURCE_ADDREF(res);
570 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571 lock_flags, 0, NULL);
572 LDLM_RESOURCE_DELREF(res);
573 ldlm_resource_putref(res);
577 static int osc_destroy_interpret(const struct lu_env *env,
578 struct ptlrpc_request *req, void *data,
581 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
583 atomic_dec(&cli->cl_destroy_in_flight);
584 wake_up(&cli->cl_destroy_waitq);
588 static int osc_can_send_destroy(struct client_obd *cli)
590 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591 cli->cl_max_rpcs_in_flight) {
592 /* The destroy request can be sent */
595 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596 cli->cl_max_rpcs_in_flight) {
598 * The counter has been modified between the two atomic
601 wake_up(&cli->cl_destroy_waitq);
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
609 struct client_obd *cli = &exp->exp_obd->u.cli;
610 struct ptlrpc_request *req;
611 struct ost_body *body;
612 struct list_head cancels = LIST_HEAD_INIT(cancels);
617 CDEBUG(D_INFO, "oa NULL\n");
621 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622 LDLM_FL_DISCARD_DATA);
624 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
626 ldlm_lock_list_put(&cancels, l_bl_ast, count);
630 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
633 ptlrpc_request_free(req);
637 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638 ptlrpc_at_set_req_timeout(req);
640 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
642 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
644 ptlrpc_request_set_replen(req);
646 req->rq_interpret_reply = osc_destroy_interpret;
647 if (!osc_can_send_destroy(cli)) {
648 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
651 * Wait until the number of on-going destroy RPCs drops
652 * under max_rpc_in_flight
654 l_wait_event_exclusive(cli->cl_destroy_waitq,
655 osc_can_send_destroy(cli), &lwi);
658 /* Do not wait for response */
659 ptlrpcd_add_req(req);
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
666 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
668 LASSERT(!(oa->o_valid & bits));
671 spin_lock(&cli->cl_loi_list_lock);
672 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673 oa->o_dirty = cli->cl_dirty_grant;
675 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
676 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677 cli->cl_dirty_max_pages)) {
678 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679 cli->cl_dirty_pages, cli->cl_dirty_transit,
680 cli->cl_dirty_max_pages);
682 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683 atomic_long_read(&obd_dirty_transit_pages) >
684 (long)(obd_max_dirty_pages + 1))) {
685 /* The atomic_read() allowing the atomic_inc() are
686 * not covered by a lock thus they may safely race and trip
687 * this CERROR() unless we add in a small fudge factor (+1). */
688 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689 cli_name(cli), atomic_long_read(&obd_dirty_pages),
690 atomic_long_read(&obd_dirty_transit_pages),
691 obd_max_dirty_pages);
693 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
695 CERROR("dirty %lu - dirty_max %lu too big???\n",
696 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
699 unsigned long nrpages;
701 nrpages = cli->cl_max_pages_per_rpc;
702 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704 oa->o_undirty = nrpages << PAGE_SHIFT;
705 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
709 /* take extent tax into account when asking for more
711 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
712 cli->cl_max_extent_pages;
713 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
716 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717 oa->o_dropped = cli->cl_lost_grant;
718 cli->cl_lost_grant = 0;
719 spin_unlock(&cli->cl_loi_list_lock);
720 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
721 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
724 void osc_update_next_shrink(struct client_obd *cli)
726 cli->cl_next_shrink_grant =
727 cfs_time_shift(cli->cl_grant_shrink_interval);
728 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729 cli->cl_next_shrink_grant);
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
734 spin_lock(&cli->cl_loi_list_lock);
735 cli->cl_avail_grant += grant;
736 spin_unlock(&cli->cl_loi_list_lock);
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
741 if (body->oa.o_valid & OBD_MD_FLGRANT) {
742 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
743 __osc_update_grant(cli, body->oa.o_grant);
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748 u32 keylen, void *key,
749 u32 vallen, void *val,
750 struct ptlrpc_request_set *set);
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753 struct ptlrpc_request *req,
756 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758 struct ost_body *body;
761 __osc_update_grant(cli, oa->o_grant);
765 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
767 osc_update_grant(cli, body);
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
775 spin_lock(&cli->cl_loi_list_lock);
776 oa->o_grant = cli->cl_avail_grant / 4;
777 cli->cl_avail_grant -= oa->o_grant;
778 spin_unlock(&cli->cl_loi_list_lock);
779 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780 oa->o_valid |= OBD_MD_FLFLAGS;
783 oa->o_flags |= OBD_FL_SHRINK_GRANT;
784 osc_update_next_shrink(cli);
787 /* Shrink the current grant, either from some large amount to enough for a
788 * full set of in-flight RPCs, or if we have already shrunk to that limit
789 * then to enough for a single RPC. This avoids keeping more grant than
790 * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
793 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
796 spin_lock(&cli->cl_loi_list_lock);
797 if (cli->cl_avail_grant <= target_bytes)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
799 spin_unlock(&cli->cl_loi_list_lock);
801 return osc_shrink_grant_to_target(cli, target_bytes);
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
807 struct ost_body *body;
810 spin_lock(&cli->cl_loi_list_lock);
811 /* Don't shrink if we are already above or below the desired limit
812 * We don't want to shrink below a single RPC, as that will negatively
813 * impact block allocation and long-term performance. */
814 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
815 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
817 if (target_bytes >= cli->cl_avail_grant) {
818 spin_unlock(&cli->cl_loi_list_lock);
821 spin_unlock(&cli->cl_loi_list_lock);
827 osc_announce_cached(cli, &body->oa, 0);
829 spin_lock(&cli->cl_loi_list_lock);
830 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831 cli->cl_avail_grant = target_bytes;
832 spin_unlock(&cli->cl_loi_list_lock);
833 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834 body->oa.o_valid |= OBD_MD_FLFLAGS;
835 body->oa.o_flags = 0;
837 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838 osc_update_next_shrink(cli);
840 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842 sizeof(*body), body, NULL);
844 __osc_update_grant(cli, body->oa.o_grant);
849 static int osc_should_shrink_grant(struct client_obd *client)
851 cfs_time_t time = cfs_time_current();
852 cfs_time_t next_shrink = client->cl_next_shrink_grant;
854 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855 OBD_CONNECT_GRANT_SHRINK) == 0)
858 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859 /* Get the current RPC size directly, instead of going via:
860 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861 * Keep comment here so that it can be found by searching. */
862 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
864 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865 client->cl_avail_grant > brw_size)
868 osc_update_next_shrink(client);
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
875 struct client_obd *client;
877 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878 if (osc_should_shrink_grant(client))
879 osc_shrink_grant(client);
884 static int osc_add_shrink_grant(struct client_obd *client)
888 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
890 osc_grant_shrink_grant_cb, NULL,
891 &client->cl_grant_shrink_list);
893 CERROR("add grant client %s error %d\n", cli_name(client), rc);
896 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897 osc_update_next_shrink(client);
901 static int osc_del_shrink_grant(struct client_obd *client)
903 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
910 * ocd_grant is the total grant amount we're expect to hold: if we've
911 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
915 * race is tolerable here: if we're evicted, but imp_state already
916 * left EVICTED state, then cl_dirty_pages must be 0 already.
918 spin_lock(&cli->cl_loi_list_lock);
919 cli->cl_avail_grant = ocd->ocd_grant;
920 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921 cli->cl_avail_grant -= cli->cl_reserved_grant;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923 cli->cl_avail_grant -= cli->cl_dirty_grant;
925 cli->cl_avail_grant -=
926 cli->cl_dirty_pages << PAGE_SHIFT;
929 if (cli->cl_avail_grant < 0) {
930 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931 cli_name(cli), cli->cl_avail_grant,
932 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT);
933 /* workaround for servers which do not have the patch from
935 cli->cl_avail_grant = ocd->ocd_grant;
938 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
942 /* overhead for each extent insertion */
943 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
944 /* determine the appropriate chunk size used by osc_extent. */
945 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
946 ocd->ocd_grant_blkbits);
947 /* max_pages_per_rpc must be chunk aligned */
948 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
949 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
950 ~chunk_mask) & chunk_mask;
951 /* determine maximum extent size, in #pages */
952 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
953 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
954 if (cli->cl_max_extent_pages == 0)
955 cli->cl_max_extent_pages = 1;
957 cli->cl_grant_extent_tax = 0;
958 cli->cl_chunkbits = PAGE_SHIFT;
959 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
961 spin_unlock(&cli->cl_loi_list_lock);
963 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
964 "chunk bits: %d cl_max_extent_pages: %d\n",
966 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
967 cli->cl_max_extent_pages);
969 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
970 list_empty(&cli->cl_grant_shrink_list))
971 osc_add_shrink_grant(cli);
974 /* We assume that the reason this OSC got a short read is because it read
975 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
976 * via the LOV, and it _knows_ it's reading inside the file, it's just that
977 * this stripe never got written at or beyond this stripe offset yet. */
978 static void handle_short_read(int nob_read, size_t page_count,
979 struct brw_page **pga)
984 /* skip bytes read OK */
985 while (nob_read > 0) {
986 LASSERT (page_count > 0);
988 if (pga[i]->count > nob_read) {
989 /* EOF inside this page */
990 ptr = kmap(pga[i]->pg) +
991 (pga[i]->off & ~PAGE_MASK);
992 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
999 nob_read -= pga[i]->count;
1004 /* zero remaining pages */
1005 while (page_count-- > 0) {
1006 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1007 memset(ptr, 0, pga[i]->count);
1013 static int check_write_rcs(struct ptlrpc_request *req,
1014 int requested_nob, int niocount,
1015 size_t page_count, struct brw_page **pga)
1020 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1021 sizeof(*remote_rcs) *
1023 if (remote_rcs == NULL) {
1024 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1028 /* return error if any niobuf was in error */
1029 for (i = 0; i < niocount; i++) {
1030 if ((int)remote_rcs[i] < 0)
1031 return(remote_rcs[i]);
1033 if (remote_rcs[i] != 0) {
1034 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1035 i, remote_rcs[i], req);
1040 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1041 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1042 req->rq_bulk->bd_nob_transferred, requested_nob);
1049 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1051 if (p1->flag != p2->flag) {
1052 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1053 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1054 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1056 /* warn if we try to combine flags that we don't know to be
1057 * safe to combine */
1058 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1059 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1060 "report this at https://jira.hpdd.intel.com/\n",
1061 p1->flag, p2->flag);
1066 return (p1->off + p1->count == p2->off);
1069 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1070 struct brw_page **pga, int opc,
1071 cksum_type_t cksum_type)
1075 struct cfs_crypto_hash_desc *hdesc;
1076 unsigned int bufsize;
1078 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1080 LASSERT(pg_count > 0);
1082 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1083 if (IS_ERR(hdesc)) {
1084 CERROR("Unable to initialize checksum hash %s\n",
1085 cfs_crypto_hash_name(cfs_alg));
1086 return PTR_ERR(hdesc);
1089 while (nob > 0 && pg_count > 0) {
1090 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1092 /* corrupt the data before we compute the checksum, to
1093 * simulate an OST->client data error */
1094 if (i == 0 && opc == OST_READ &&
1095 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1096 unsigned char *ptr = kmap(pga[i]->pg);
1097 int off = pga[i]->off & ~PAGE_MASK;
1099 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1102 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1103 pga[i]->off & ~PAGE_MASK,
1105 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1106 (int)(pga[i]->off & ~PAGE_MASK));
1108 nob -= pga[i]->count;
1113 bufsize = sizeof(cksum);
1114 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1116 /* For sending we only compute the wrong checksum instead
1117 * of corrupting the data so it is still correct on a redo */
1118 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1125 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1126 u32 page_count, struct brw_page **pga,
1127 struct ptlrpc_request **reqp, int resend)
1129 struct ptlrpc_request *req;
1130 struct ptlrpc_bulk_desc *desc;
1131 struct ost_body *body;
1132 struct obd_ioobj *ioobj;
1133 struct niobuf_remote *niobuf;
1134 int niocount, i, requested_nob, opc, rc;
1135 struct osc_brw_async_args *aa;
1136 struct req_capsule *pill;
1137 struct brw_page *pg_prev;
1140 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1141 RETURN(-ENOMEM); /* Recoverable */
1142 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1143 RETURN(-EINVAL); /* Fatal */
1145 if ((cmd & OBD_BRW_WRITE) != 0) {
1147 req = ptlrpc_request_alloc_pool(cli->cl_import,
1149 &RQF_OST_BRW_WRITE);
1152 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1157 for (niocount = i = 1; i < page_count; i++) {
1158 if (!can_merge_pages(pga[i - 1], pga[i]))
1162 pill = &req->rq_pill;
1163 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1165 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1166 niocount * sizeof(*niobuf));
1168 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1170 ptlrpc_request_free(req);
1173 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1174 ptlrpc_at_set_req_timeout(req);
1175 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1177 req->rq_no_retry_einprogress = 1;
1179 desc = ptlrpc_prep_bulk_imp(req, page_count,
1180 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1181 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1182 PTLRPC_BULK_PUT_SINK) |
1183 PTLRPC_BULK_BUF_KIOV,
1185 &ptlrpc_bulk_kiov_pin_ops);
1188 GOTO(out, rc = -ENOMEM);
1189 /* NB request now owns desc and will free it when it gets freed */
1191 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1192 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1193 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1194 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1196 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1198 obdo_to_ioobj(oa, ioobj);
1199 ioobj->ioo_bufcnt = niocount;
1200 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1201 * that might be send for this request. The actual number is decided
1202 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1203 * "max - 1" for old client compatibility sending "0", and also so the
1204 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1205 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1206 LASSERT(page_count > 0);
1208 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1209 struct brw_page *pg = pga[i];
1210 int poff = pg->off & ~PAGE_MASK;
1212 LASSERT(pg->count > 0);
1213 /* make sure there is no gap in the middle of page array */
1214 LASSERTF(page_count == 1 ||
1215 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1216 ergo(i > 0 && i < page_count - 1,
1217 poff == 0 && pg->count == PAGE_SIZE) &&
1218 ergo(i == page_count - 1, poff == 0)),
1219 "i: %d/%d pg: %p off: %llu, count: %u\n",
1220 i, page_count, pg, pg->off, pg->count);
1221 LASSERTF(i == 0 || pg->off > pg_prev->off,
1222 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1223 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1225 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1226 pg_prev->pg, page_private(pg_prev->pg),
1227 pg_prev->pg->index, pg_prev->off);
1228 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1229 (pg->flag & OBD_BRW_SRVLOCK));
1231 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1232 requested_nob += pg->count;
1234 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1236 niobuf->rnb_len += pg->count;
1238 niobuf->rnb_offset = pg->off;
1239 niobuf->rnb_len = pg->count;
1240 niobuf->rnb_flags = pg->flag;
1245 LASSERTF((void *)(niobuf - niocount) ==
1246 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1247 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1248 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1250 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1252 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1253 body->oa.o_valid |= OBD_MD_FLFLAGS;
1254 body->oa.o_flags = 0;
1256 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1259 if (osc_should_shrink_grant(cli))
1260 osc_shrink_grant_local(cli, &body->oa);
1262 /* size[REQ_REC_OFF] still sizeof (*body) */
1263 if (opc == OST_WRITE) {
1264 if (cli->cl_checksum &&
1265 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1266 /* store cl_cksum_type in a local variable since
1267 * it can be changed via lprocfs */
1268 cksum_type_t cksum_type = cli->cl_cksum_type;
1270 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1271 oa->o_flags &= OBD_FL_LOCAL_MASK;
1272 body->oa.o_flags = 0;
1274 body->oa.o_flags |= cksum_type_pack(cksum_type);
1275 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1280 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1282 /* save this in 'oa', too, for later checking */
1283 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1284 oa->o_flags |= cksum_type_pack(cksum_type);
1286 /* clear out the checksum flag, in case this is a
1287 * resend but cl_checksum is no longer set. b=11238 */
1288 oa->o_valid &= ~OBD_MD_FLCKSUM;
1290 oa->o_cksum = body->oa.o_cksum;
1291 /* 1 RC per niobuf */
1292 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1293 sizeof(__u32) * niocount);
1295 if (cli->cl_checksum &&
1296 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1297 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1298 body->oa.o_flags = 0;
1299 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1300 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1303 ptlrpc_request_set_replen(req);
1305 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1306 aa = ptlrpc_req_async_args(req);
1308 aa->aa_requested_nob = requested_nob;
1309 aa->aa_nio_count = niocount;
1310 aa->aa_page_count = page_count;
1314 INIT_LIST_HEAD(&aa->aa_oaps);
1317 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1318 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1319 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1320 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1324 ptlrpc_req_finished(req);
1328 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1329 __u32 client_cksum, __u32 server_cksum, int nob,
1330 size_t page_count, struct brw_page **pga,
1331 cksum_type_t client_cksum_type)
1335 cksum_type_t cksum_type;
1337 if (server_cksum == client_cksum) {
1338 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1342 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1344 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1347 if (cksum_type != client_cksum_type)
1348 msg = "the server did not use the checksum type specified in "
1349 "the original request - likely a protocol problem";
1350 else if (new_cksum == server_cksum)
1351 msg = "changed on the client after we checksummed it - "
1352 "likely false positive due to mmap IO (bug 11742)";
1353 else if (new_cksum == client_cksum)
1354 msg = "changed in transit before arrival at OST";
1356 msg = "changed in transit AND doesn't match the original - "
1357 "likely false positive due to mmap IO (bug 11742)";
1359 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1360 " object "DOSTID" extent [%llu-%llu]\n",
1361 msg, libcfs_nid2str(peer->nid),
1362 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1363 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1364 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1365 POSTID(&oa->o_oi), pga[0]->off,
1366 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1367 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1368 "client csum now %x\n", client_cksum, client_cksum_type,
1369 server_cksum, cksum_type, new_cksum);
1373 /* Note rc enters this function as number of bytes transferred */
1374 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1376 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1377 const lnet_process_id_t *peer =
1378 &req->rq_import->imp_connection->c_peer;
1379 struct client_obd *cli = aa->aa_cli;
1380 struct ost_body *body;
1381 u32 client_cksum = 0;
1384 if (rc < 0 && rc != -EDQUOT) {
1385 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1389 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1390 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1392 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1396 /* set/clear over quota flag for a uid/gid */
1397 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1398 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1399 unsigned int qid[LL_MAXQUOTAS] =
1400 {body->oa.o_uid, body->oa.o_gid};
1402 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1403 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1405 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1408 osc_update_grant(cli, body);
1413 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1414 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1416 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1418 CERROR("Unexpected +ve rc %d\n", rc);
1421 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1423 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1426 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1427 check_write_checksum(&body->oa, peer, client_cksum,
1428 body->oa.o_cksum, aa->aa_requested_nob,
1429 aa->aa_page_count, aa->aa_ppga,
1430 cksum_type_unpack(aa->aa_oa->o_flags)))
1433 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1434 aa->aa_page_count, aa->aa_ppga);
1438 /* The rest of this function executes only for OST_READs */
1440 /* if unwrap_bulk failed, return -EAGAIN to retry */
1441 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1443 GOTO(out, rc = -EAGAIN);
1445 if (rc > aa->aa_requested_nob) {
1446 CERROR("Unexpected rc %d (%d requested)\n", rc,
1447 aa->aa_requested_nob);
1451 if (rc != req->rq_bulk->bd_nob_transferred) {
1452 CERROR ("Unexpected rc %d (%d transferred)\n",
1453 rc, req->rq_bulk->bd_nob_transferred);
1457 if (rc < aa->aa_requested_nob)
1458 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1460 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1461 static int cksum_counter;
1462 u32 server_cksum = body->oa.o_cksum;
1465 cksum_type_t cksum_type;
1467 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1468 body->oa.o_flags : 0);
1469 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1470 aa->aa_ppga, OST_READ,
1473 if (peer->nid != req->rq_bulk->bd_sender) {
1475 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1478 if (server_cksum != client_cksum) {
1479 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1480 "%s%s%s inode "DFID" object "DOSTID
1481 " extent [%llu-%llu]\n",
1482 req->rq_import->imp_obd->obd_name,
1483 libcfs_nid2str(peer->nid),
1485 body->oa.o_valid & OBD_MD_FLFID ?
1486 body->oa.o_parent_seq : (__u64)0,
1487 body->oa.o_valid & OBD_MD_FLFID ?
1488 body->oa.o_parent_oid : 0,
1489 body->oa.o_valid & OBD_MD_FLFID ?
1490 body->oa.o_parent_ver : 0,
1491 POSTID(&body->oa.o_oi),
1492 aa->aa_ppga[0]->off,
1493 aa->aa_ppga[aa->aa_page_count-1]->off +
1494 aa->aa_ppga[aa->aa_page_count-1]->count -
1496 CERROR("client %x, server %x, cksum_type %x\n",
1497 client_cksum, server_cksum, cksum_type);
1499 aa->aa_oa->o_cksum = client_cksum;
1503 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1506 } else if (unlikely(client_cksum)) {
1507 static int cksum_missed;
1510 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1511 CERROR("Checksum %u requested from %s but not sent\n",
1512 cksum_missed, libcfs_nid2str(peer->nid));
1518 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1519 aa->aa_oa, &body->oa);
1524 static int osc_brw_redo_request(struct ptlrpc_request *request,
1525 struct osc_brw_async_args *aa, int rc)
1527 struct ptlrpc_request *new_req;
1528 struct osc_brw_async_args *new_aa;
1529 struct osc_async_page *oap;
1532 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1533 "redo for recoverable error %d", rc);
1535 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1536 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1537 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1538 aa->aa_ppga, &new_req, 1);
1542 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1543 if (oap->oap_request != NULL) {
1544 LASSERTF(request == oap->oap_request,
1545 "request %p != oap_request %p\n",
1546 request, oap->oap_request);
1547 if (oap->oap_interrupted) {
1548 ptlrpc_req_finished(new_req);
1553 /* New request takes over pga and oaps from old request.
1554 * Note that copying a list_head doesn't work, need to move it... */
1556 new_req->rq_interpret_reply = request->rq_interpret_reply;
1557 new_req->rq_async_args = request->rq_async_args;
1558 new_req->rq_commit_cb = request->rq_commit_cb;
1559 /* cap resend delay to the current request timeout, this is similar to
1560 * what ptlrpc does (see after_reply()) */
1561 if (aa->aa_resends > new_req->rq_timeout)
1562 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1564 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1565 new_req->rq_generation_set = 1;
1566 new_req->rq_import_generation = request->rq_import_generation;
1568 new_aa = ptlrpc_req_async_args(new_req);
1570 INIT_LIST_HEAD(&new_aa->aa_oaps);
1571 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1572 INIT_LIST_HEAD(&new_aa->aa_exts);
1573 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1574 new_aa->aa_resends = aa->aa_resends;
1576 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1577 if (oap->oap_request) {
1578 ptlrpc_req_finished(oap->oap_request);
1579 oap->oap_request = ptlrpc_request_addref(new_req);
1583 /* XXX: This code will run into problem if we're going to support
1584 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1585 * and wait for all of them to be finished. We should inherit request
1586 * set from old request. */
1587 ptlrpcd_add_req(new_req);
1589 DEBUG_REQ(D_INFO, new_req, "new request");
1594 * ugh, we want disk allocation on the target to happen in offset order. we'll
1595 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1596 * fine for our small page arrays and doesn't require allocation. its an
1597 * insertion sort that swaps elements that are strides apart, shrinking the
1598 * stride down until its '1' and the array is sorted.
1600 static void sort_brw_pages(struct brw_page **array, int num)
1603 struct brw_page *tmp;
1607 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1612 for (i = stride ; i < num ; i++) {
1615 while (j >= stride && array[j - stride]->off > tmp->off) {
1616 array[j] = array[j - stride];
1621 } while (stride > 1);
1624 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1626 LASSERT(ppga != NULL);
1627 OBD_FREE(ppga, sizeof(*ppga) * count);
1630 static int brw_interpret(const struct lu_env *env,
1631 struct ptlrpc_request *req, void *data, int rc)
1633 struct osc_brw_async_args *aa = data;
1634 struct osc_extent *ext;
1635 struct osc_extent *tmp;
1636 struct client_obd *cli = aa->aa_cli;
1639 rc = osc_brw_fini_request(req, rc);
1640 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1641 /* When server return -EINPROGRESS, client should always retry
1642 * regardless of the number of times the bulk was resent already. */
1643 if (osc_recoverable_error(rc)) {
1644 if (req->rq_import_generation !=
1645 req->rq_import->imp_generation) {
1646 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1647 ""DOSTID", rc = %d.\n",
1648 req->rq_import->imp_obd->obd_name,
1649 POSTID(&aa->aa_oa->o_oi), rc);
1650 } else if (rc == -EINPROGRESS ||
1651 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1652 rc = osc_brw_redo_request(req, aa, rc);
1654 CERROR("%s: too many resent retries for object: "
1655 "%llu:%llu, rc = %d.\n",
1656 req->rq_import->imp_obd->obd_name,
1657 POSTID(&aa->aa_oa->o_oi), rc);
1662 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1667 struct obdo *oa = aa->aa_oa;
1668 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1669 unsigned long valid = 0;
1670 struct cl_object *obj;
1671 struct osc_async_page *last;
1673 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1674 obj = osc2cl(last->oap_obj);
1676 cl_object_attr_lock(obj);
1677 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1678 attr->cat_blocks = oa->o_blocks;
1679 valid |= CAT_BLOCKS;
1681 if (oa->o_valid & OBD_MD_FLMTIME) {
1682 attr->cat_mtime = oa->o_mtime;
1685 if (oa->o_valid & OBD_MD_FLATIME) {
1686 attr->cat_atime = oa->o_atime;
1689 if (oa->o_valid & OBD_MD_FLCTIME) {
1690 attr->cat_ctime = oa->o_ctime;
1694 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1695 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1696 loff_t last_off = last->oap_count + last->oap_obj_off +
1699 /* Change file size if this is an out of quota or
1700 * direct IO write and it extends the file size */
1701 if (loi->loi_lvb.lvb_size < last_off) {
1702 attr->cat_size = last_off;
1705 /* Extend KMS if it's not a lockless write */
1706 if (loi->loi_kms < last_off &&
1707 oap2osc_page(last)->ops_srvlock == 0) {
1708 attr->cat_kms = last_off;
1714 cl_object_attr_update(env, obj, attr, valid);
1715 cl_object_attr_unlock(obj);
1717 OBDO_FREE(aa->aa_oa);
1719 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1720 osc_inc_unstable_pages(req);
1722 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1723 list_del_init(&ext->oe_link);
1724 osc_extent_finish(env, ext, 1, rc);
1726 LASSERT(list_empty(&aa->aa_exts));
1727 LASSERT(list_empty(&aa->aa_oaps));
1729 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1730 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1732 spin_lock(&cli->cl_loi_list_lock);
1733 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1734 * is called so we know whether to go to sync BRWs or wait for more
1735 * RPCs to complete */
1736 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1737 cli->cl_w_in_flight--;
1739 cli->cl_r_in_flight--;
1740 osc_wake_cache_waiters(cli);
1741 spin_unlock(&cli->cl_loi_list_lock);
1743 osc_io_unplug(env, cli, NULL);
1747 static void brw_commit(struct ptlrpc_request *req)
1749 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1750 * this called via the rq_commit_cb, I need to ensure
1751 * osc_dec_unstable_pages is still called. Otherwise unstable
1752 * pages may be leaked. */
1753 spin_lock(&req->rq_lock);
1754 if (likely(req->rq_unstable)) {
1755 req->rq_unstable = 0;
1756 spin_unlock(&req->rq_lock);
1758 osc_dec_unstable_pages(req);
1760 req->rq_committed = 1;
1761 spin_unlock(&req->rq_lock);
1766 * Build an RPC by the list of extent @ext_list. The caller must ensure
1767 * that the total pages in this list are NOT over max pages per RPC.
1768 * Extents in the list must be in OES_RPC state.
1770 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1771 struct list_head *ext_list, int cmd)
1773 struct ptlrpc_request *req = NULL;
1774 struct osc_extent *ext;
1775 struct brw_page **pga = NULL;
1776 struct osc_brw_async_args *aa = NULL;
1777 struct obdo *oa = NULL;
1778 struct osc_async_page *oap;
1779 struct osc_object *obj = NULL;
1780 struct cl_req_attr *crattr = NULL;
1781 loff_t starting_offset = OBD_OBJECT_EOF;
1782 loff_t ending_offset = 0;
1786 bool soft_sync = false;
1787 bool interrupted = false;
1791 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1792 struct ost_body *body;
1794 LASSERT(!list_empty(ext_list));
1796 /* add pages into rpc_list to build BRW rpc */
1797 list_for_each_entry(ext, ext_list, oe_link) {
1798 LASSERT(ext->oe_state == OES_RPC);
1799 mem_tight |= ext->oe_memalloc;
1800 grant += ext->oe_grants;
1801 page_count += ext->oe_nr_pages;
1806 soft_sync = osc_over_unstable_soft_limit(cli);
1808 mpflag = cfs_memory_pressure_get_and_set();
1810 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1812 GOTO(out, rc = -ENOMEM);
1816 GOTO(out, rc = -ENOMEM);
1819 list_for_each_entry(ext, ext_list, oe_link) {
1820 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1822 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1824 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1825 pga[i] = &oap->oap_brw_page;
1826 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1829 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1830 if (starting_offset == OBD_OBJECT_EOF ||
1831 starting_offset > oap->oap_obj_off)
1832 starting_offset = oap->oap_obj_off;
1834 LASSERT(oap->oap_page_off == 0);
1835 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1836 ending_offset = oap->oap_obj_off +
1839 LASSERT(oap->oap_page_off + oap->oap_count ==
1841 if (oap->oap_interrupted)
1846 /* first page in the list */
1847 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1849 crattr = &osc_env_info(env)->oti_req_attr;
1850 memset(crattr, 0, sizeof(*crattr));
1851 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1852 crattr->cra_flags = ~0ULL;
1853 crattr->cra_page = oap2cl_page(oap);
1854 crattr->cra_oa = oa;
1855 cl_req_attr_set(env, osc2cl(obj), crattr);
1857 if (cmd == OBD_BRW_WRITE)
1858 oa->o_grant_used = grant;
1860 sort_brw_pages(pga, page_count);
1861 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1863 CERROR("prep_req failed: %d\n", rc);
1867 req->rq_commit_cb = brw_commit;
1868 req->rq_interpret_reply = brw_interpret;
1869 req->rq_memalloc = mem_tight != 0;
1870 oap->oap_request = ptlrpc_request_addref(req);
1871 if (interrupted && !req->rq_intr)
1872 ptlrpc_mark_interrupted(req);
1874 /* Need to update the timestamps after the request is built in case
1875 * we race with setattr (locally or in queue at OST). If OST gets
1876 * later setattr before earlier BRW (as determined by the request xid),
1877 * the OST will not use BRW timestamps. Sadly, there is no obvious
1878 * way to do this in a single call. bug 10150 */
1879 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1880 crattr->cra_oa = &body->oa;
1881 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1882 cl_req_attr_set(env, osc2cl(obj), crattr);
1883 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1885 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1886 aa = ptlrpc_req_async_args(req);
1887 INIT_LIST_HEAD(&aa->aa_oaps);
1888 list_splice_init(&rpc_list, &aa->aa_oaps);
1889 INIT_LIST_HEAD(&aa->aa_exts);
1890 list_splice_init(ext_list, &aa->aa_exts);
1892 spin_lock(&cli->cl_loi_list_lock);
1893 starting_offset >>= PAGE_SHIFT;
1894 if (cmd == OBD_BRW_READ) {
1895 cli->cl_r_in_flight++;
1896 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1897 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1898 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1899 starting_offset + 1);
1901 cli->cl_w_in_flight++;
1902 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1903 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1904 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1905 starting_offset + 1);
1907 spin_unlock(&cli->cl_loi_list_lock);
1909 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1910 page_count, aa, cli->cl_r_in_flight,
1911 cli->cl_w_in_flight);
1912 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
1914 ptlrpcd_add_req(req);
1920 cfs_memory_pressure_restore(mpflag);
1923 LASSERT(req == NULL);
1928 OBD_FREE(pga, sizeof(*pga) * page_count);
1929 /* this should happen rarely and is pretty bad, it makes the
1930 * pending list not follow the dirty order */
1931 while (!list_empty(ext_list)) {
1932 ext = list_entry(ext_list->next, struct osc_extent,
1934 list_del_init(&ext->oe_link);
1935 osc_extent_finish(env, ext, 0, rc);
1941 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1945 LASSERT(lock != NULL);
1947 lock_res_and_lock(lock);
1949 if (lock->l_ast_data == NULL)
1950 lock->l_ast_data = data;
1951 if (lock->l_ast_data == data)
1954 unlock_res_and_lock(lock);
1959 static int osc_enqueue_fini(struct ptlrpc_request *req,
1960 osc_enqueue_upcall_f upcall, void *cookie,
1961 struct lustre_handle *lockh, enum ldlm_mode mode,
1962 __u64 *flags, int agl, int errcode)
1964 bool intent = *flags & LDLM_FL_HAS_INTENT;
1968 /* The request was created before ldlm_cli_enqueue call. */
1969 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1970 struct ldlm_reply *rep;
1972 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1973 LASSERT(rep != NULL);
1975 rep->lock_policy_res1 =
1976 ptlrpc_status_ntoh(rep->lock_policy_res1);
1977 if (rep->lock_policy_res1)
1978 errcode = rep->lock_policy_res1;
1980 *flags |= LDLM_FL_LVB_READY;
1981 } else if (errcode == ELDLM_OK) {
1982 *flags |= LDLM_FL_LVB_READY;
1985 /* Call the update callback. */
1986 rc = (*upcall)(cookie, lockh, errcode);
1988 /* release the reference taken in ldlm_cli_enqueue() */
1989 if (errcode == ELDLM_LOCK_MATCHED)
1991 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1992 ldlm_lock_decref(lockh, mode);
1997 static int osc_enqueue_interpret(const struct lu_env *env,
1998 struct ptlrpc_request *req,
1999 struct osc_enqueue_args *aa, int rc)
2001 struct ldlm_lock *lock;
2002 struct lustre_handle *lockh = &aa->oa_lockh;
2003 enum ldlm_mode mode = aa->oa_mode;
2004 struct ost_lvb *lvb = aa->oa_lvb;
2005 __u32 lvb_len = sizeof(*lvb);
2010 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2012 lock = ldlm_handle2lock(lockh);
2013 LASSERTF(lock != NULL,
2014 "lockh %#llx, req %p, aa %p - client evicted?\n",
2015 lockh->cookie, req, aa);
2017 /* Take an additional reference so that a blocking AST that
2018 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2019 * to arrive after an upcall has been executed by
2020 * osc_enqueue_fini(). */
2021 ldlm_lock_addref(lockh, mode);
2023 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2024 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2026 /* Let CP AST to grant the lock first. */
2027 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2030 LASSERT(aa->oa_lvb == NULL);
2031 LASSERT(aa->oa_flags == NULL);
2032 aa->oa_flags = &flags;
2035 /* Complete obtaining the lock procedure. */
2036 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2037 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2039 /* Complete osc stuff. */
2040 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2041 aa->oa_flags, aa->oa_agl, rc);
2043 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2045 ldlm_lock_decref(lockh, mode);
2046 LDLM_LOCK_PUT(lock);
2050 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2052 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2053 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2054 * other synchronous requests, however keeping some locks and trying to obtain
2055 * others may take a considerable amount of time in a case of ost failure; and
2056 * when other sync requests do not get released lock from a client, the client
2057 * is evicted from the cluster -- such scenarious make the life difficult, so
2058 * release locks just after they are obtained. */
2059 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2060 __u64 *flags, union ldlm_policy_data *policy,
2061 struct ost_lvb *lvb, int kms_valid,
2062 osc_enqueue_upcall_f upcall, void *cookie,
2063 struct ldlm_enqueue_info *einfo,
2064 struct ptlrpc_request_set *rqset, int async, int agl)
2066 struct obd_device *obd = exp->exp_obd;
2067 struct lustre_handle lockh = { 0 };
2068 struct ptlrpc_request *req = NULL;
2069 int intent = *flags & LDLM_FL_HAS_INTENT;
2070 __u64 match_flags = *flags;
2071 enum ldlm_mode mode;
2075 /* Filesystem lock extents are extended to page boundaries so that
2076 * dealing with the page cache is a little smoother. */
2077 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2078 policy->l_extent.end |= ~PAGE_MASK;
2081 * kms is not valid when either object is completely fresh (so that no
2082 * locks are cached), or object was evicted. In the latter case cached
2083 * lock cannot be used, because it would prime inode state with
2084 * potentially stale LVB.
2089 /* Next, search for already existing extent locks that will cover us */
2090 /* If we're trying to read, we also search for an existing PW lock. The
2091 * VFS and page cache already protect us locally, so lots of readers/
2092 * writers can share a single PW lock.
2094 * There are problems with conversion deadlocks, so instead of
2095 * converting a read lock to a write lock, we'll just enqueue a new
2098 * At some point we should cancel the read lock instead of making them
2099 * send us a blocking callback, but there are problems with canceling
2100 * locks out from other users right now, too. */
2101 mode = einfo->ei_mode;
2102 if (einfo->ei_mode == LCK_PR)
2105 match_flags |= LDLM_FL_LVB_READY;
2107 match_flags |= LDLM_FL_BLOCK_GRANTED;
2108 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2109 einfo->ei_type, policy, mode, &lockh, 0);
2111 struct ldlm_lock *matched;
2113 if (*flags & LDLM_FL_TEST_LOCK)
2116 matched = ldlm_handle2lock(&lockh);
2118 /* AGL enqueues DLM locks speculatively. Therefore if
2119 * it already exists a DLM lock, it wll just inform the
2120 * caller to cancel the AGL process for this stripe. */
2121 ldlm_lock_decref(&lockh, mode);
2122 LDLM_LOCK_PUT(matched);
2124 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2125 *flags |= LDLM_FL_LVB_READY;
2127 /* We already have a lock, and it's referenced. */
2128 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2130 ldlm_lock_decref(&lockh, mode);
2131 LDLM_LOCK_PUT(matched);
2134 ldlm_lock_decref(&lockh, mode);
2135 LDLM_LOCK_PUT(matched);
2140 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2144 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2145 &RQF_LDLM_ENQUEUE_LVB);
2149 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2151 ptlrpc_request_free(req);
2155 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2157 ptlrpc_request_set_replen(req);
2160 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2161 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2163 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2164 sizeof(*lvb), LVB_T_OST, &lockh, async);
2167 struct osc_enqueue_args *aa;
2168 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2169 aa = ptlrpc_req_async_args(req);
2171 aa->oa_mode = einfo->ei_mode;
2172 aa->oa_type = einfo->ei_type;
2173 lustre_handle_copy(&aa->oa_lockh, &lockh);
2174 aa->oa_upcall = upcall;
2175 aa->oa_cookie = cookie;
2178 aa->oa_flags = flags;
2181 /* AGL is essentially to enqueue an DLM lock
2182 * in advance, so we don't care about the
2183 * result of AGL enqueue. */
2185 aa->oa_flags = NULL;
2188 req->rq_interpret_reply =
2189 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2190 if (rqset == PTLRPCD_SET)
2191 ptlrpcd_add_req(req);
2193 ptlrpc_set_add_req(rqset, req);
2194 } else if (intent) {
2195 ptlrpc_req_finished(req);
2200 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2203 ptlrpc_req_finished(req);
2208 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2209 enum ldlm_type type, union ldlm_policy_data *policy,
2210 enum ldlm_mode mode, __u64 *flags, void *data,
2211 struct lustre_handle *lockh, int unref)
2213 struct obd_device *obd = exp->exp_obd;
2214 __u64 lflags = *flags;
2218 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2221 /* Filesystem lock extents are extended to page boundaries so that
2222 * dealing with the page cache is a little smoother */
2223 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2224 policy->l_extent.end |= ~PAGE_MASK;
2226 /* Next, search for already existing extent locks that will cover us */
2227 /* If we're trying to read, we also search for an existing PW lock. The
2228 * VFS and page cache already protect us locally, so lots of readers/
2229 * writers can share a single PW lock. */
2233 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2234 res_id, type, policy, rc, lockh, unref);
2235 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2239 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2241 LASSERT(lock != NULL);
2242 if (!osc_set_lock_data(lock, data)) {
2243 ldlm_lock_decref(lockh, rc);
2246 LDLM_LOCK_PUT(lock);
2251 static int osc_statfs_interpret(const struct lu_env *env,
2252 struct ptlrpc_request *req,
2253 struct osc_async_args *aa, int rc)
2255 struct obd_statfs *msfs;
2259 /* The request has in fact never been sent
2260 * due to issues at a higher level (LOV).
2261 * Exit immediately since the caller is
2262 * aware of the problem and takes care
2263 * of the clean up */
2266 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2267 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2273 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2275 GOTO(out, rc = -EPROTO);
2278 *aa->aa_oi->oi_osfs = *msfs;
2280 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2284 static int osc_statfs_async(struct obd_export *exp,
2285 struct obd_info *oinfo, __u64 max_age,
2286 struct ptlrpc_request_set *rqset)
2288 struct obd_device *obd = class_exp2obd(exp);
2289 struct ptlrpc_request *req;
2290 struct osc_async_args *aa;
2294 /* We could possibly pass max_age in the request (as an absolute
2295 * timestamp or a "seconds.usec ago") so the target can avoid doing
2296 * extra calls into the filesystem if that isn't necessary (e.g.
2297 * during mount that would help a bit). Having relative timestamps
2298 * is not so great if request processing is slow, while absolute
2299 * timestamps are not ideal because they need time synchronization. */
2300 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2304 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2306 ptlrpc_request_free(req);
2309 ptlrpc_request_set_replen(req);
2310 req->rq_request_portal = OST_CREATE_PORTAL;
2311 ptlrpc_at_set_req_timeout(req);
2313 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2314 /* procfs requests not want stat in wait for avoid deadlock */
2315 req->rq_no_resend = 1;
2316 req->rq_no_delay = 1;
2319 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2320 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2321 aa = ptlrpc_req_async_args(req);
2324 ptlrpc_set_add_req(rqset, req);
2328 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2329 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2331 struct obd_device *obd = class_exp2obd(exp);
2332 struct obd_statfs *msfs;
2333 struct ptlrpc_request *req;
2334 struct obd_import *imp = NULL;
2338 /*Since the request might also come from lprocfs, so we need
2339 *sync this with client_disconnect_export Bug15684*/
2340 down_read(&obd->u.cli.cl_sem);
2341 if (obd->u.cli.cl_import)
2342 imp = class_import_get(obd->u.cli.cl_import);
2343 up_read(&obd->u.cli.cl_sem);
2347 /* We could possibly pass max_age in the request (as an absolute
2348 * timestamp or a "seconds.usec ago") so the target can avoid doing
2349 * extra calls into the filesystem if that isn't necessary (e.g.
2350 * during mount that would help a bit). Having relative timestamps
2351 * is not so great if request processing is slow, while absolute
2352 * timestamps are not ideal because they need time synchronization. */
2353 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2355 class_import_put(imp);
2360 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2362 ptlrpc_request_free(req);
2365 ptlrpc_request_set_replen(req);
2366 req->rq_request_portal = OST_CREATE_PORTAL;
2367 ptlrpc_at_set_req_timeout(req);
2369 if (flags & OBD_STATFS_NODELAY) {
2370 /* procfs requests not want stat in wait for avoid deadlock */
2371 req->rq_no_resend = 1;
2372 req->rq_no_delay = 1;
2375 rc = ptlrpc_queue_wait(req);
2379 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2381 GOTO(out, rc = -EPROTO);
2388 ptlrpc_req_finished(req);
2392 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2393 void *karg, void __user *uarg)
2395 struct obd_device *obd = exp->exp_obd;
2396 struct obd_ioctl_data *data = karg;
2400 if (!try_module_get(THIS_MODULE)) {
2401 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2402 module_name(THIS_MODULE));
2406 case OBD_IOC_CLIENT_RECOVER:
2407 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2408 data->ioc_inlbuf1, 0);
2412 case IOC_OSC_SET_ACTIVE:
2413 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2416 case OBD_IOC_PING_TARGET:
2417 err = ptlrpc_obd_ping(obd);
2420 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2421 cmd, current_comm());
2422 GOTO(out, err = -ENOTTY);
2425 module_put(THIS_MODULE);
2429 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2430 u32 keylen, void *key,
2431 u32 vallen, void *val,
2432 struct ptlrpc_request_set *set)
2434 struct ptlrpc_request *req;
2435 struct obd_device *obd = exp->exp_obd;
2436 struct obd_import *imp = class_exp2cliimp(exp);
2441 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2443 if (KEY_IS(KEY_CHECKSUM)) {
2444 if (vallen != sizeof(int))
2446 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2450 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2451 sptlrpc_conf_client_adapt(obd);
2455 if (KEY_IS(KEY_FLUSH_CTX)) {
2456 sptlrpc_import_flush_my_ctx(imp);
2460 if (KEY_IS(KEY_CACHE_SET)) {
2461 struct client_obd *cli = &obd->u.cli;
2463 LASSERT(cli->cl_cache == NULL); /* only once */
2464 cli->cl_cache = (struct cl_client_cache *)val;
2465 cl_cache_incref(cli->cl_cache);
2466 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2468 /* add this osc into entity list */
2469 LASSERT(list_empty(&cli->cl_lru_osc));
2470 spin_lock(&cli->cl_cache->ccc_lru_lock);
2471 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2472 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2477 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2478 struct client_obd *cli = &obd->u.cli;
2479 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2480 long target = *(long *)val;
2482 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2487 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2490 /* We pass all other commands directly to OST. Since nobody calls osc
2491 methods directly and everybody is supposed to go through LOV, we
2492 assume lov checked invalid values for us.
2493 The only recognised values so far are evict_by_nid and mds_conn.
2494 Even if something bad goes through, we'd get a -EINVAL from OST
2497 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2498 &RQF_OST_SET_GRANT_INFO :
2503 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2504 RCL_CLIENT, keylen);
2505 if (!KEY_IS(KEY_GRANT_SHRINK))
2506 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2507 RCL_CLIENT, vallen);
2508 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2510 ptlrpc_request_free(req);
2514 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2515 memcpy(tmp, key, keylen);
2516 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2519 memcpy(tmp, val, vallen);
2521 if (KEY_IS(KEY_GRANT_SHRINK)) {
2522 struct osc_grant_args *aa;
2525 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2526 aa = ptlrpc_req_async_args(req);
2529 ptlrpc_req_finished(req);
2532 *oa = ((struct ost_body *)val)->oa;
2534 req->rq_interpret_reply = osc_shrink_grant_interpret;
2537 ptlrpc_request_set_replen(req);
2538 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2539 LASSERT(set != NULL);
2540 ptlrpc_set_add_req(set, req);
2541 ptlrpc_check_set(NULL, set);
2543 ptlrpcd_add_req(req);
2549 static int osc_reconnect(const struct lu_env *env,
2550 struct obd_export *exp, struct obd_device *obd,
2551 struct obd_uuid *cluuid,
2552 struct obd_connect_data *data,
2555 struct client_obd *cli = &obd->u.cli;
2557 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2561 spin_lock(&cli->cl_loi_list_lock);
2562 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2563 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2564 grant += cli->cl_dirty_grant;
2566 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2567 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2568 lost_grant = cli->cl_lost_grant;
2569 cli->cl_lost_grant = 0;
2570 spin_unlock(&cli->cl_loi_list_lock);
2572 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2573 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2574 data->ocd_version, data->ocd_grant, lost_grant);
2580 static int osc_disconnect(struct obd_export *exp)
2582 struct obd_device *obd = class_exp2obd(exp);
2585 rc = client_disconnect_export(exp);
2587 * Initially we put del_shrink_grant before disconnect_export, but it
2588 * causes the following problem if setup (connect) and cleanup
2589 * (disconnect) are tangled together.
2590 * connect p1 disconnect p2
2591 * ptlrpc_connect_import
2592 * ............... class_manual_cleanup
2595 * ptlrpc_connect_interrupt
2597 * add this client to shrink list
2599 * Bang! pinger trigger the shrink.
2600 * So the osc should be disconnected from the shrink list, after we
2601 * are sure the import has been destroyed. BUG18662
2603 if (obd->u.cli.cl_import == NULL)
2604 osc_del_shrink_grant(&obd->u.cli);
2608 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2609 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2611 struct lu_env *env = arg;
2612 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2613 struct ldlm_lock *lock;
2614 struct osc_object *osc = NULL;
2618 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2619 if (lock->l_ast_data != NULL && osc == NULL) {
2620 osc = lock->l_ast_data;
2621 cl_object_get(osc2cl(osc));
2624 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2625 * by the 2nd round of ldlm_namespace_clean() call in
2626 * osc_import_event(). */
2627 ldlm_clear_cleaned(lock);
2632 osc_object_invalidate(env, osc);
2633 cl_object_put(env, osc2cl(osc));
2639 static int osc_import_event(struct obd_device *obd,
2640 struct obd_import *imp,
2641 enum obd_import_event event)
2643 struct client_obd *cli;
2647 LASSERT(imp->imp_obd == obd);
2650 case IMP_EVENT_DISCON: {
2652 spin_lock(&cli->cl_loi_list_lock);
2653 cli->cl_avail_grant = 0;
2654 cli->cl_lost_grant = 0;
2655 spin_unlock(&cli->cl_loi_list_lock);
2658 case IMP_EVENT_INACTIVE: {
2659 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2662 case IMP_EVENT_INVALIDATE: {
2663 struct ldlm_namespace *ns = obd->obd_namespace;
2667 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2669 env = cl_env_get(&refcheck);
2671 osc_io_unplug(env, &obd->u.cli, NULL);
2673 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2674 osc_ldlm_resource_invalidate,
2676 cl_env_put(env, &refcheck);
2678 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2683 case IMP_EVENT_ACTIVE: {
2684 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2687 case IMP_EVENT_OCD: {
2688 struct obd_connect_data *ocd = &imp->imp_connect_data;
2690 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2691 osc_init_grant(&obd->u.cli, ocd);
2694 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2695 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2697 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2700 case IMP_EVENT_DEACTIVATE: {
2701 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2704 case IMP_EVENT_ACTIVATE: {
2705 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2709 CERROR("Unknown import event %d\n", event);
2716 * Determine whether the lock can be canceled before replaying the lock
2717 * during recovery, see bug16774 for detailed information.
2719 * \retval zero the lock can't be canceled
2720 * \retval other ok to cancel
2722 static int osc_cancel_weight(struct ldlm_lock *lock)
2725 * Cancel all unused and granted extent lock.
2727 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2728 lock->l_granted_mode == lock->l_req_mode &&
2729 osc_ldlm_weigh_ast(lock) == 0)
2735 static int brw_queue_work(const struct lu_env *env, void *data)
2737 struct client_obd *cli = data;
2739 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2741 osc_io_unplug(env, cli, NULL);
2745 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2747 struct client_obd *cli = &obd->u.cli;
2748 struct obd_type *type;
2756 rc = ptlrpcd_addref();
2760 rc = client_obd_setup(obd, lcfg);
2762 GOTO(out_ptlrpcd, rc);
2764 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2765 if (IS_ERR(handler))
2766 GOTO(out_client_setup, rc = PTR_ERR(handler));
2767 cli->cl_writeback_work = handler;
2769 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2770 if (IS_ERR(handler))
2771 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2772 cli->cl_lru_work = handler;
2774 rc = osc_quota_setup(obd);
2776 GOTO(out_ptlrpcd_work, rc);
2778 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2780 #ifdef CONFIG_PROC_FS
2781 obd->obd_vars = lprocfs_osc_obd_vars;
2783 /* If this is true then both client (osc) and server (osp) are on the
2784 * same node. The osp layer if loaded first will register the osc proc
2785 * directory. In that case this obd_device will be attached its proc
2786 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2787 type = class_search_type(LUSTRE_OSP_NAME);
2788 if (type && type->typ_procsym) {
2789 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2791 obd->obd_vars, obd);
2792 if (IS_ERR(obd->obd_proc_entry)) {
2793 rc = PTR_ERR(obd->obd_proc_entry);
2794 CERROR("error %d setting up lprocfs for %s\n", rc,
2796 obd->obd_proc_entry = NULL;
2799 rc = lprocfs_obd_setup(obd);
2802 /* If the basic OSC proc tree construction succeeded then
2803 * lets do the rest. */
2805 lproc_osc_attach_seqstat(obd);
2806 sptlrpc_lprocfs_cliobd_attach(obd);
2807 ptlrpc_lprocfs_register_obd(obd);
2811 * We try to control the total number of requests with a upper limit
2812 * osc_reqpool_maxreqcount. There might be some race which will cause
2813 * over-limit allocation, but it is fine.
2815 req_count = atomic_read(&osc_pool_req_count);
2816 if (req_count < osc_reqpool_maxreqcount) {
2817 adding = cli->cl_max_rpcs_in_flight + 2;
2818 if (req_count + adding > osc_reqpool_maxreqcount)
2819 adding = osc_reqpool_maxreqcount - req_count;
2821 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2822 atomic_add(added, &osc_pool_req_count);
2825 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2826 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2828 spin_lock(&osc_shrink_lock);
2829 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2830 spin_unlock(&osc_shrink_lock);
2835 if (cli->cl_writeback_work != NULL) {
2836 ptlrpcd_destroy_work(cli->cl_writeback_work);
2837 cli->cl_writeback_work = NULL;
2839 if (cli->cl_lru_work != NULL) {
2840 ptlrpcd_destroy_work(cli->cl_lru_work);
2841 cli->cl_lru_work = NULL;
2844 client_obd_cleanup(obd);
2850 static int osc_precleanup(struct obd_device *obd)
2852 struct client_obd *cli = &obd->u.cli;
2856 * for echo client, export may be on zombie list, wait for
2857 * zombie thread to cull it, because cli.cl_import will be
2858 * cleared in client_disconnect_export():
2859 * class_export_destroy() -> obd_cleanup() ->
2860 * echo_device_free() -> echo_client_cleanup() ->
2861 * obd_disconnect() -> osc_disconnect() ->
2862 * client_disconnect_export()
2864 obd_zombie_barrier();
2865 if (cli->cl_writeback_work) {
2866 ptlrpcd_destroy_work(cli->cl_writeback_work);
2867 cli->cl_writeback_work = NULL;
2870 if (cli->cl_lru_work) {
2871 ptlrpcd_destroy_work(cli->cl_lru_work);
2872 cli->cl_lru_work = NULL;
2875 obd_cleanup_client_import(obd);
2876 ptlrpc_lprocfs_unregister_obd(obd);
2877 lprocfs_obd_cleanup(obd);
2881 int osc_cleanup(struct obd_device *obd)
2883 struct client_obd *cli = &obd->u.cli;
2888 spin_lock(&osc_shrink_lock);
2889 list_del(&cli->cl_shrink_list);
2890 spin_unlock(&osc_shrink_lock);
2893 if (cli->cl_cache != NULL) {
2894 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2895 spin_lock(&cli->cl_cache->ccc_lru_lock);
2896 list_del_init(&cli->cl_lru_osc);
2897 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2898 cli->cl_lru_left = NULL;
2899 cl_cache_decref(cli->cl_cache);
2900 cli->cl_cache = NULL;
2903 /* free memory of osc quota cache */
2904 osc_quota_cleanup(obd);
2906 rc = client_obd_cleanup(obd);
2912 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2914 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2915 return rc > 0 ? 0: rc;
2918 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2920 return osc_process_config_base(obd, buf);
2923 static struct obd_ops osc_obd_ops = {
2924 .o_owner = THIS_MODULE,
2925 .o_setup = osc_setup,
2926 .o_precleanup = osc_precleanup,
2927 .o_cleanup = osc_cleanup,
2928 .o_add_conn = client_import_add_conn,
2929 .o_del_conn = client_import_del_conn,
2930 .o_connect = client_connect_import,
2931 .o_reconnect = osc_reconnect,
2932 .o_disconnect = osc_disconnect,
2933 .o_statfs = osc_statfs,
2934 .o_statfs_async = osc_statfs_async,
2935 .o_create = osc_create,
2936 .o_destroy = osc_destroy,
2937 .o_getattr = osc_getattr,
2938 .o_setattr = osc_setattr,
2939 .o_iocontrol = osc_iocontrol,
2940 .o_set_info_async = osc_set_info_async,
2941 .o_import_event = osc_import_event,
2942 .o_process_config = osc_process_config,
2943 .o_quotactl = osc_quotactl,
2946 static struct shrinker *osc_cache_shrinker;
2947 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2948 DEFINE_SPINLOCK(osc_shrink_lock);
2950 #ifndef HAVE_SHRINKER_COUNT
2951 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2953 struct shrink_control scv = {
2954 .nr_to_scan = shrink_param(sc, nr_to_scan),
2955 .gfp_mask = shrink_param(sc, gfp_mask)
2957 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2958 struct shrinker *shrinker = NULL;
2961 (void)osc_cache_shrink_scan(shrinker, &scv);
2963 return osc_cache_shrink_count(shrinker, &scv);
2967 static int __init osc_init(void)
2969 bool enable_proc = true;
2970 struct obd_type *type;
2971 unsigned int reqpool_size;
2972 unsigned int reqsize;
2974 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2975 osc_cache_shrink_count, osc_cache_shrink_scan);
2978 /* print an address of _any_ initialized kernel symbol from this
2979 * module, to allow debugging with gdb that doesn't support data
2980 * symbols from modules.*/
2981 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2983 rc = lu_kmem_init(osc_caches);
2987 type = class_search_type(LUSTRE_OSP_NAME);
2988 if (type != NULL && type->typ_procsym != NULL)
2989 enable_proc = false;
2991 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2992 LUSTRE_OSC_NAME, &osc_device_type);
2996 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2998 /* This is obviously too much memory, only prevent overflow here */
2999 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3000 GOTO(out_type, rc = -EINVAL);
3002 reqpool_size = osc_reqpool_mem_max << 20;
3005 while (reqsize < OST_IO_MAXREQSIZE)
3006 reqsize = reqsize << 1;
3009 * We don't enlarge the request count in OSC pool according to
3010 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3011 * tried after normal allocation failed. So a small OSC pool won't
3012 * cause much performance degression in most of cases.
3014 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3016 atomic_set(&osc_pool_req_count, 0);
3017 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3018 ptlrpc_add_rqs_to_pool);
3020 if (osc_rq_pool != NULL)
3024 class_unregister_type(LUSTRE_OSC_NAME);
3026 lu_kmem_fini(osc_caches);
3031 static void __exit osc_exit(void)
3033 remove_shrinker(osc_cache_shrinker);
3034 class_unregister_type(LUSTRE_OSC_NAME);
3035 lu_kmem_fini(osc_caches);
3036 ptlrpc_free_rq_pool(osc_rq_pool);
3039 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3040 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3041 MODULE_VERSION(LUSTRE_VERSION_STRING);
3042 MODULE_LICENSE("GPL");
3044 module_init(osc_init);
3045 module_exit(osc_exit);