4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
65 struct osc_brw_async_args {
71 struct brw_page **aa_ppga;
72 struct client_obd *aa_cli;
73 struct list_head aa_oaps;
74 struct list_head aa_exts;
77 #define osc_grant_args osc_brw_async_args
79 struct osc_setattr_args {
81 obd_enqueue_update_f sa_upcall;
85 struct osc_fsync_args {
86 struct osc_object *fa_obj;
88 obd_enqueue_update_f fa_upcall;
92 struct osc_enqueue_args {
93 struct obd_export *oa_exp;
94 enum ldlm_type oa_type;
95 enum ldlm_mode oa_mode;
97 osc_enqueue_upcall_f oa_upcall;
99 struct ost_lvb *oa_lvb;
100 struct lustre_handle oa_lockh;
101 unsigned int oa_agl:1;
104 static void osc_release_ppga(struct brw_page **ppga, size_t count);
105 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
108 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
110 struct ost_body *body;
112 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121 struct ptlrpc_request *req;
122 struct ost_body *body;
126 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
130 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
132 ptlrpc_request_free(req);
136 osc_pack_req_body(req, oa);
138 ptlrpc_request_set_replen(req);
140 rc = ptlrpc_queue_wait(req);
144 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
146 GOTO(out, rc = -EPROTO);
148 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
149 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
151 oa->o_blksize = cli_brw_size(exp->exp_obd);
152 oa->o_valid |= OBD_MD_FLBLKSZ;
156 ptlrpc_req_finished(req);
161 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164 struct ptlrpc_request *req;
165 struct ost_body *body;
169 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
171 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
175 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
177 ptlrpc_request_free(req);
181 osc_pack_req_body(req, oa);
183 ptlrpc_request_set_replen(req);
185 rc = ptlrpc_queue_wait(req);
189 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191 GOTO(out, rc = -EPROTO);
193 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
197 ptlrpc_req_finished(req);
202 static int osc_setattr_interpret(const struct lu_env *env,
203 struct ptlrpc_request *req,
204 struct osc_setattr_args *sa, int rc)
206 struct ost_body *body;
212 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214 GOTO(out, rc = -EPROTO);
216 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
219 rc = sa->sa_upcall(sa->sa_cookie, rc);
223 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
224 obd_enqueue_update_f upcall, void *cookie,
225 struct ptlrpc_request_set *rqset)
227 struct ptlrpc_request *req;
228 struct osc_setattr_args *sa;
233 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
237 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
239 ptlrpc_request_free(req);
243 osc_pack_req_body(req, oa);
245 ptlrpc_request_set_replen(req);
247 /* do mds to ost setattr asynchronously */
249 /* Do not wait for response. */
250 ptlrpcd_add_req(req);
252 req->rq_interpret_reply =
253 (ptlrpc_interpterer_t)osc_setattr_interpret;
255 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
256 sa = ptlrpc_req_async_args(req);
258 sa->sa_upcall = upcall;
259 sa->sa_cookie = cookie;
261 if (rqset == PTLRPCD_SET)
262 ptlrpcd_add_req(req);
264 ptlrpc_set_add_req(rqset, req);
270 static int osc_create(const struct lu_env *env, struct obd_export *exp,
273 struct ptlrpc_request *req;
274 struct ost_body *body;
279 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
280 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
282 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
284 GOTO(out, rc = -ENOMEM);
286 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
288 ptlrpc_request_free(req);
292 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
295 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
297 ptlrpc_request_set_replen(req);
299 rc = ptlrpc_queue_wait(req);
303 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
305 GOTO(out_req, rc = -EPROTO);
307 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
308 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
310 oa->o_blksize = cli_brw_size(exp->exp_obd);
311 oa->o_valid |= OBD_MD_FLBLKSZ;
313 CDEBUG(D_HA, "transno: "LPD64"\n",
314 lustre_msg_get_transno(req->rq_repmsg));
316 ptlrpc_req_finished(req);
321 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
322 obd_enqueue_update_f upcall, void *cookie,
323 struct ptlrpc_request_set *rqset)
325 struct ptlrpc_request *req;
326 struct osc_setattr_args *sa;
327 struct ost_body *body;
331 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
335 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
337 ptlrpc_request_free(req);
340 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
341 ptlrpc_at_set_req_timeout(req);
343 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
345 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347 ptlrpc_request_set_replen(req);
349 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
350 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
351 sa = ptlrpc_req_async_args(req);
353 sa->sa_upcall = upcall;
354 sa->sa_cookie = cookie;
355 if (rqset == PTLRPCD_SET)
356 ptlrpcd_add_req(req);
358 ptlrpc_set_add_req(rqset, req);
363 static int osc_sync_interpret(const struct lu_env *env,
364 struct ptlrpc_request *req,
367 struct osc_fsync_args *fa = arg;
368 struct ost_body *body;
369 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
370 unsigned long valid = 0;
371 struct cl_object *obj;
377 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
379 CERROR("can't unpack ost_body\n");
380 GOTO(out, rc = -EPROTO);
383 *fa->fa_oa = body->oa;
384 obj = osc2cl(fa->fa_obj);
386 /* Update osc object's blocks attribute */
387 cl_object_attr_lock(obj);
388 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
389 attr->cat_blocks = body->oa.o_blocks;
394 cl_object_attr_update(env, obj, attr, valid);
395 cl_object_attr_unlock(obj);
398 rc = fa->fa_upcall(fa->fa_cookie, rc);
402 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
403 obd_enqueue_update_f upcall, void *cookie,
404 struct ptlrpc_request_set *rqset)
406 struct obd_export *exp = osc_export(obj);
407 struct ptlrpc_request *req;
408 struct ost_body *body;
409 struct osc_fsync_args *fa;
413 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
417 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
419 ptlrpc_request_free(req);
423 /* overload the size and blocks fields in the oa with start/end */
424 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
428 ptlrpc_request_set_replen(req);
429 req->rq_interpret_reply = osc_sync_interpret;
431 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
432 fa = ptlrpc_req_async_args(req);
435 fa->fa_upcall = upcall;
436 fa->fa_cookie = cookie;
438 if (rqset == PTLRPCD_SET)
439 ptlrpcd_add_req(req);
441 ptlrpc_set_add_req(rqset, req);
446 /* Find and cancel locally locks matched by @mode in the resource found by
447 * @objid. Found locks are added into @cancel list. Returns the amount of
448 * locks added to @cancels list. */
449 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
450 struct list_head *cancels,
451 enum ldlm_mode mode, __u64 lock_flags)
453 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
454 struct ldlm_res_id res_id;
455 struct ldlm_resource *res;
459 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
460 * export) but disabled through procfs (flag in NS).
462 * This distinguishes from a case when ELC is not supported originally,
463 * when we still want to cancel locks in advance and just cancel them
464 * locally, without sending any RPC. */
465 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
468 ostid_build_res_name(&oa->o_oi, &res_id);
469 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
473 LDLM_RESOURCE_ADDREF(res);
474 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
475 lock_flags, 0, NULL);
476 LDLM_RESOURCE_DELREF(res);
477 ldlm_resource_putref(res);
481 static int osc_destroy_interpret(const struct lu_env *env,
482 struct ptlrpc_request *req, void *data,
485 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
487 atomic_dec(&cli->cl_destroy_in_flight);
488 wake_up(&cli->cl_destroy_waitq);
492 static int osc_can_send_destroy(struct client_obd *cli)
494 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
495 cli->cl_max_rpcs_in_flight) {
496 /* The destroy request can be sent */
499 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
500 cli->cl_max_rpcs_in_flight) {
502 * The counter has been modified between the two atomic
505 wake_up(&cli->cl_destroy_waitq);
510 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
513 struct client_obd *cli = &exp->exp_obd->u.cli;
514 struct ptlrpc_request *req;
515 struct ost_body *body;
516 struct list_head cancels = LIST_HEAD_INIT(cancels);
521 CDEBUG(D_INFO, "oa NULL\n");
525 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
526 LDLM_FL_DISCARD_DATA);
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
530 ldlm_lock_list_put(&cancels, l_bl_ast, count);
534 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
537 ptlrpc_request_free(req);
541 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542 ptlrpc_at_set_req_timeout(req);
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
548 ptlrpc_request_set_replen(req);
550 req->rq_interpret_reply = osc_destroy_interpret;
551 if (!osc_can_send_destroy(cli)) {
552 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
555 * Wait until the number of on-going destroy RPCs drops
556 * under max_rpc_in_flight
558 l_wait_event_exclusive(cli->cl_destroy_waitq,
559 osc_can_send_destroy(cli), &lwi);
562 /* Do not wait for response */
563 ptlrpcd_add_req(req);
567 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
570 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
572 LASSERT(!(oa->o_valid & bits));
575 spin_lock(&cli->cl_loi_list_lock);
576 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
577 oa->o_dirty = cli->cl_dirty_grant;
579 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
580 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
581 cli->cl_dirty_max_pages)) {
582 CERROR("dirty %lu - %lu > dirty_max %lu\n",
583 cli->cl_dirty_pages, cli->cl_dirty_transit,
584 cli->cl_dirty_max_pages);
586 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
587 atomic_long_read(&obd_dirty_transit_pages) >
588 (long)(obd_max_dirty_pages + 1))) {
589 /* The atomic_read() allowing the atomic_inc() are
590 * not covered by a lock thus they may safely race and trip
591 * this CERROR() unless we add in a small fudge factor (+1). */
592 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
593 cli_name(cli), atomic_long_read(&obd_dirty_pages),
594 atomic_long_read(&obd_dirty_transit_pages),
595 obd_max_dirty_pages);
597 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
599 CERROR("dirty %lu - dirty_max %lu too big???\n",
600 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
603 unsigned long nrpages;
605 nrpages = cli->cl_max_pages_per_rpc;
606 nrpages *= cli->cl_max_rpcs_in_flight + 1;
607 nrpages = max(nrpages, cli->cl_dirty_max_pages);
608 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
609 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
613 /* take extent tax into account when asking for more
615 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
616 cli->cl_max_extent_pages;
617 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
620 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
621 oa->o_dropped = cli->cl_lost_grant;
622 cli->cl_lost_grant = 0;
623 spin_unlock(&cli->cl_loi_list_lock);
624 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
625 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
628 void osc_update_next_shrink(struct client_obd *cli)
630 cli->cl_next_shrink_grant =
631 cfs_time_shift(cli->cl_grant_shrink_interval);
632 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
633 cli->cl_next_shrink_grant);
636 static void __osc_update_grant(struct client_obd *cli, u64 grant)
638 spin_lock(&cli->cl_loi_list_lock);
639 cli->cl_avail_grant += grant;
640 spin_unlock(&cli->cl_loi_list_lock);
643 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
645 if (body->oa.o_valid & OBD_MD_FLGRANT) {
646 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
647 __osc_update_grant(cli, body->oa.o_grant);
651 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
652 u32 keylen, void *key,
653 u32 vallen, void *val,
654 struct ptlrpc_request_set *set);
656 static int osc_shrink_grant_interpret(const struct lu_env *env,
657 struct ptlrpc_request *req,
660 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
661 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
662 struct ost_body *body;
665 __osc_update_grant(cli, oa->o_grant);
669 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
671 osc_update_grant(cli, body);
677 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
679 spin_lock(&cli->cl_loi_list_lock);
680 oa->o_grant = cli->cl_avail_grant / 4;
681 cli->cl_avail_grant -= oa->o_grant;
682 spin_unlock(&cli->cl_loi_list_lock);
683 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
684 oa->o_valid |= OBD_MD_FLFLAGS;
687 oa->o_flags |= OBD_FL_SHRINK_GRANT;
688 osc_update_next_shrink(cli);
691 /* Shrink the current grant, either from some large amount to enough for a
692 * full set of in-flight RPCs, or if we have already shrunk to that limit
693 * then to enough for a single RPC. This avoids keeping more grant than
694 * needed, and avoids shrinking the grant piecemeal. */
695 static int osc_shrink_grant(struct client_obd *cli)
697 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
698 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
700 spin_lock(&cli->cl_loi_list_lock);
701 if (cli->cl_avail_grant <= target_bytes)
702 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
703 spin_unlock(&cli->cl_loi_list_lock);
705 return osc_shrink_grant_to_target(cli, target_bytes);
708 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
711 struct ost_body *body;
714 spin_lock(&cli->cl_loi_list_lock);
715 /* Don't shrink if we are already above or below the desired limit
716 * We don't want to shrink below a single RPC, as that will negatively
717 * impact block allocation and long-term performance. */
718 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
719 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
721 if (target_bytes >= cli->cl_avail_grant) {
722 spin_unlock(&cli->cl_loi_list_lock);
725 spin_unlock(&cli->cl_loi_list_lock);
731 osc_announce_cached(cli, &body->oa, 0);
733 spin_lock(&cli->cl_loi_list_lock);
734 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
735 cli->cl_avail_grant = target_bytes;
736 spin_unlock(&cli->cl_loi_list_lock);
737 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
738 body->oa.o_valid |= OBD_MD_FLFLAGS;
739 body->oa.o_flags = 0;
741 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
742 osc_update_next_shrink(cli);
744 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
745 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
746 sizeof(*body), body, NULL);
748 __osc_update_grant(cli, body->oa.o_grant);
753 static int osc_should_shrink_grant(struct client_obd *client)
755 cfs_time_t time = cfs_time_current();
756 cfs_time_t next_shrink = client->cl_next_shrink_grant;
758 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
759 OBD_CONNECT_GRANT_SHRINK) == 0)
762 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
763 /* Get the current RPC size directly, instead of going via:
764 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
765 * Keep comment here so that it can be found by searching. */
766 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
768 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
769 client->cl_avail_grant > brw_size)
772 osc_update_next_shrink(client);
777 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
779 struct client_obd *client;
781 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
782 if (osc_should_shrink_grant(client))
783 osc_shrink_grant(client);
788 static int osc_add_shrink_grant(struct client_obd *client)
792 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
794 osc_grant_shrink_grant_cb, NULL,
795 &client->cl_grant_shrink_list);
797 CERROR("add grant client %s error %d\n", cli_name(client), rc);
800 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
801 osc_update_next_shrink(client);
805 static int osc_del_shrink_grant(struct client_obd *client)
807 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
811 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
814 * ocd_grant is the total grant amount we're expect to hold: if we've
815 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
816 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
819 * race is tolerable here: if we're evicted, but imp_state already
820 * left EVICTED state, then cl_dirty_pages must be 0 already.
822 spin_lock(&cli->cl_loi_list_lock);
823 cli->cl_avail_grant = ocd->ocd_grant;
824 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
825 cli->cl_avail_grant -= cli->cl_reserved_grant;
826 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
827 cli->cl_avail_grant -= cli->cl_dirty_grant;
829 cli->cl_avail_grant -=
830 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
833 if (cli->cl_avail_grant < 0) {
834 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
835 cli_name(cli), cli->cl_avail_grant,
836 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
837 /* workaround for servers which do not have the patch from
839 cli->cl_avail_grant = ocd->ocd_grant;
842 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
845 /* overhead for each extent insertion */
846 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
847 /* determine the appropriate chunk size used by osc_extent. */
848 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
849 ocd->ocd_grant_blkbits);
850 /* determine maximum extent size, in #pages */
851 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
852 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
853 if (cli->cl_max_extent_pages == 0)
854 cli->cl_max_extent_pages = 1;
856 cli->cl_grant_extent_tax = 0;
857 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
858 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
860 spin_unlock(&cli->cl_loi_list_lock);
862 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
863 "chunk bits: %d cl_max_extent_pages: %d\n",
865 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
866 cli->cl_max_extent_pages);
868 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
869 list_empty(&cli->cl_grant_shrink_list))
870 osc_add_shrink_grant(cli);
873 /* We assume that the reason this OSC got a short read is because it read
874 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
875 * via the LOV, and it _knows_ it's reading inside the file, it's just that
876 * this stripe never got written at or beyond this stripe offset yet. */
877 static void handle_short_read(int nob_read, size_t page_count,
878 struct brw_page **pga)
883 /* skip bytes read OK */
884 while (nob_read > 0) {
885 LASSERT (page_count > 0);
887 if (pga[i]->count > nob_read) {
888 /* EOF inside this page */
889 ptr = kmap(pga[i]->pg) +
890 (pga[i]->off & ~PAGE_MASK);
891 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
898 nob_read -= pga[i]->count;
903 /* zero remaining pages */
904 while (page_count-- > 0) {
905 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
906 memset(ptr, 0, pga[i]->count);
912 static int check_write_rcs(struct ptlrpc_request *req,
913 int requested_nob, int niocount,
914 size_t page_count, struct brw_page **pga)
919 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
920 sizeof(*remote_rcs) *
922 if (remote_rcs == NULL) {
923 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
927 /* return error if any niobuf was in error */
928 for (i = 0; i < niocount; i++) {
929 if ((int)remote_rcs[i] < 0)
930 return(remote_rcs[i]);
932 if (remote_rcs[i] != 0) {
933 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
934 i, remote_rcs[i], req);
939 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
940 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
941 req->rq_bulk->bd_nob_transferred, requested_nob);
948 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
950 if (p1->flag != p2->flag) {
951 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
952 OBD_BRW_SYNC | OBD_BRW_ASYNC |
953 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
955 /* warn if we try to combine flags that we don't know to be
957 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
958 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
959 "report this at https://jira.hpdd.intel.com/\n",
965 return (p1->off + p1->count == p2->off);
968 static u32 osc_checksum_bulk(int nob, size_t pg_count,
969 struct brw_page **pga, int opc,
970 cksum_type_t cksum_type)
974 struct cfs_crypto_hash_desc *hdesc;
975 unsigned int bufsize;
977 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
979 LASSERT(pg_count > 0);
981 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
983 CERROR("Unable to initialize checksum hash %s\n",
984 cfs_crypto_hash_name(cfs_alg));
985 return PTR_ERR(hdesc);
988 while (nob > 0 && pg_count > 0) {
989 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
991 /* corrupt the data before we compute the checksum, to
992 * simulate an OST->client data error */
993 if (i == 0 && opc == OST_READ &&
994 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
995 unsigned char *ptr = kmap(pga[i]->pg);
996 int off = pga[i]->off & ~PAGE_MASK;
998 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1001 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1002 pga[i]->off & ~PAGE_MASK,
1004 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1005 (int)(pga[i]->off & ~PAGE_MASK));
1007 nob -= pga[i]->count;
1012 bufsize = sizeof(cksum);
1013 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1015 /* For sending we only compute the wrong checksum instead
1016 * of corrupting the data so it is still correct on a redo */
1017 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1024 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1025 u32 page_count, struct brw_page **pga,
1026 struct ptlrpc_request **reqp, int resend)
1028 struct ptlrpc_request *req;
1029 struct ptlrpc_bulk_desc *desc;
1030 struct ost_body *body;
1031 struct obd_ioobj *ioobj;
1032 struct niobuf_remote *niobuf;
1033 int niocount, i, requested_nob, opc, rc;
1034 struct osc_brw_async_args *aa;
1035 struct req_capsule *pill;
1036 struct brw_page *pg_prev;
1039 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1040 RETURN(-ENOMEM); /* Recoverable */
1041 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1042 RETURN(-EINVAL); /* Fatal */
1044 if ((cmd & OBD_BRW_WRITE) != 0) {
1046 req = ptlrpc_request_alloc_pool(cli->cl_import,
1048 &RQF_OST_BRW_WRITE);
1051 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1056 for (niocount = i = 1; i < page_count; i++) {
1057 if (!can_merge_pages(pga[i - 1], pga[i]))
1061 pill = &req->rq_pill;
1062 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1064 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1065 niocount * sizeof(*niobuf));
1067 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1069 ptlrpc_request_free(req);
1072 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1073 ptlrpc_at_set_req_timeout(req);
1074 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1076 req->rq_no_retry_einprogress = 1;
1078 desc = ptlrpc_prep_bulk_imp(req, page_count,
1079 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1080 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1081 PTLRPC_BULK_PUT_SINK) |
1082 PTLRPC_BULK_BUF_KIOV,
1084 &ptlrpc_bulk_kiov_pin_ops);
1087 GOTO(out, rc = -ENOMEM);
1088 /* NB request now owns desc and will free it when it gets freed */
1090 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1091 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1092 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1093 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1095 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1097 obdo_to_ioobj(oa, ioobj);
1098 ioobj->ioo_bufcnt = niocount;
1099 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1100 * that might be send for this request. The actual number is decided
1101 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1102 * "max - 1" for old client compatibility sending "0", and also so the
1103 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1104 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1105 LASSERT(page_count > 0);
1107 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1108 struct brw_page *pg = pga[i];
1109 int poff = pg->off & ~PAGE_MASK;
1111 LASSERT(pg->count > 0);
1112 /* make sure there is no gap in the middle of page array */
1113 LASSERTF(page_count == 1 ||
1114 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1115 ergo(i > 0 && i < page_count - 1,
1116 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1117 ergo(i == page_count - 1, poff == 0)),
1118 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1119 i, page_count, pg, pg->off, pg->count);
1120 LASSERTF(i == 0 || pg->off > pg_prev->off,
1121 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1122 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1124 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1125 pg_prev->pg, page_private(pg_prev->pg),
1126 pg_prev->pg->index, pg_prev->off);
1127 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1128 (pg->flag & OBD_BRW_SRVLOCK));
1130 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1131 requested_nob += pg->count;
1133 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1135 niobuf->rnb_len += pg->count;
1137 niobuf->rnb_offset = pg->off;
1138 niobuf->rnb_len = pg->count;
1139 niobuf->rnb_flags = pg->flag;
1144 LASSERTF((void *)(niobuf - niocount) ==
1145 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1146 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1147 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1149 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1151 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1152 body->oa.o_valid |= OBD_MD_FLFLAGS;
1153 body->oa.o_flags = 0;
1155 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1158 if (osc_should_shrink_grant(cli))
1159 osc_shrink_grant_local(cli, &body->oa);
1161 /* size[REQ_REC_OFF] still sizeof (*body) */
1162 if (opc == OST_WRITE) {
1163 if (cli->cl_checksum &&
1164 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1165 /* store cl_cksum_type in a local variable since
1166 * it can be changed via lprocfs */
1167 cksum_type_t cksum_type = cli->cl_cksum_type;
1169 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1170 oa->o_flags &= OBD_FL_LOCAL_MASK;
1171 body->oa.o_flags = 0;
1173 body->oa.o_flags |= cksum_type_pack(cksum_type);
1174 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1175 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1179 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1181 /* save this in 'oa', too, for later checking */
1182 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1183 oa->o_flags |= cksum_type_pack(cksum_type);
1185 /* clear out the checksum flag, in case this is a
1186 * resend but cl_checksum is no longer set. b=11238 */
1187 oa->o_valid &= ~OBD_MD_FLCKSUM;
1189 oa->o_cksum = body->oa.o_cksum;
1190 /* 1 RC per niobuf */
1191 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1192 sizeof(__u32) * niocount);
1194 if (cli->cl_checksum &&
1195 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1196 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1197 body->oa.o_flags = 0;
1198 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1199 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1202 ptlrpc_request_set_replen(req);
1204 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1205 aa = ptlrpc_req_async_args(req);
1207 aa->aa_requested_nob = requested_nob;
1208 aa->aa_nio_count = niocount;
1209 aa->aa_page_count = page_count;
1213 INIT_LIST_HEAD(&aa->aa_oaps);
1216 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1217 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1218 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1219 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1223 ptlrpc_req_finished(req);
1227 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1228 __u32 client_cksum, __u32 server_cksum, int nob,
1229 size_t page_count, struct brw_page **pga,
1230 cksum_type_t client_cksum_type)
1234 cksum_type_t cksum_type;
1236 if (server_cksum == client_cksum) {
1237 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1241 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1243 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1246 if (cksum_type != client_cksum_type)
1247 msg = "the server did not use the checksum type specified in "
1248 "the original request - likely a protocol problem";
1249 else if (new_cksum == server_cksum)
1250 msg = "changed on the client after we checksummed it - "
1251 "likely false positive due to mmap IO (bug 11742)";
1252 else if (new_cksum == client_cksum)
1253 msg = "changed in transit before arrival at OST";
1255 msg = "changed in transit AND doesn't match the original - "
1256 "likely false positive due to mmap IO (bug 11742)";
1258 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1259 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1260 msg, libcfs_nid2str(peer->nid),
1261 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1262 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1263 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1264 POSTID(&oa->o_oi), pga[0]->off,
1265 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1266 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1267 "client csum now %x\n", client_cksum, client_cksum_type,
1268 server_cksum, cksum_type, new_cksum);
1272 /* Note rc enters this function as number of bytes transferred */
1273 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1275 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1276 const lnet_process_id_t *peer =
1277 &req->rq_import->imp_connection->c_peer;
1278 struct client_obd *cli = aa->aa_cli;
1279 struct ost_body *body;
1280 u32 client_cksum = 0;
1283 if (rc < 0 && rc != -EDQUOT) {
1284 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1288 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1291 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1295 /* set/clear over quota flag for a uid/gid */
1296 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1297 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1298 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1300 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1301 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1303 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1306 osc_update_grant(cli, body);
1311 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1312 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1314 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1316 CERROR("Unexpected +ve rc %d\n", rc);
1319 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1321 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1324 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1325 check_write_checksum(&body->oa, peer, client_cksum,
1326 body->oa.o_cksum, aa->aa_requested_nob,
1327 aa->aa_page_count, aa->aa_ppga,
1328 cksum_type_unpack(aa->aa_oa->o_flags)))
1331 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1332 aa->aa_page_count, aa->aa_ppga);
1336 /* The rest of this function executes only for OST_READs */
1338 /* if unwrap_bulk failed, return -EAGAIN to retry */
1339 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1341 GOTO(out, rc = -EAGAIN);
1343 if (rc > aa->aa_requested_nob) {
1344 CERROR("Unexpected rc %d (%d requested)\n", rc,
1345 aa->aa_requested_nob);
1349 if (rc != req->rq_bulk->bd_nob_transferred) {
1350 CERROR ("Unexpected rc %d (%d transferred)\n",
1351 rc, req->rq_bulk->bd_nob_transferred);
1355 if (rc < aa->aa_requested_nob)
1356 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1358 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1359 static int cksum_counter;
1360 u32 server_cksum = body->oa.o_cksum;
1363 cksum_type_t cksum_type;
1365 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1366 body->oa.o_flags : 0);
1367 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1368 aa->aa_ppga, OST_READ,
1371 if (peer->nid != req->rq_bulk->bd_sender) {
1373 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1376 if (server_cksum != client_cksum) {
1377 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1378 "%s%s%s inode "DFID" object "DOSTID
1379 " extent ["LPU64"-"LPU64"]\n",
1380 req->rq_import->imp_obd->obd_name,
1381 libcfs_nid2str(peer->nid),
1383 body->oa.o_valid & OBD_MD_FLFID ?
1384 body->oa.o_parent_seq : (__u64)0,
1385 body->oa.o_valid & OBD_MD_FLFID ?
1386 body->oa.o_parent_oid : 0,
1387 body->oa.o_valid & OBD_MD_FLFID ?
1388 body->oa.o_parent_ver : 0,
1389 POSTID(&body->oa.o_oi),
1390 aa->aa_ppga[0]->off,
1391 aa->aa_ppga[aa->aa_page_count-1]->off +
1392 aa->aa_ppga[aa->aa_page_count-1]->count -
1394 CERROR("client %x, server %x, cksum_type %x\n",
1395 client_cksum, server_cksum, cksum_type);
1397 aa->aa_oa->o_cksum = client_cksum;
1401 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1404 } else if (unlikely(client_cksum)) {
1405 static int cksum_missed;
1408 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1409 CERROR("Checksum %u requested from %s but not sent\n",
1410 cksum_missed, libcfs_nid2str(peer->nid));
1416 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1417 aa->aa_oa, &body->oa);
1422 static int osc_brw_redo_request(struct ptlrpc_request *request,
1423 struct osc_brw_async_args *aa, int rc)
1425 struct ptlrpc_request *new_req;
1426 struct osc_brw_async_args *new_aa;
1427 struct osc_async_page *oap;
1430 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1431 "redo for recoverable error %d", rc);
1433 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1434 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1435 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1436 aa->aa_ppga, &new_req, 1);
1440 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1441 if (oap->oap_request != NULL) {
1442 LASSERTF(request == oap->oap_request,
1443 "request %p != oap_request %p\n",
1444 request, oap->oap_request);
1445 if (oap->oap_interrupted) {
1446 ptlrpc_req_finished(new_req);
1451 /* New request takes over pga and oaps from old request.
1452 * Note that copying a list_head doesn't work, need to move it... */
1454 new_req->rq_interpret_reply = request->rq_interpret_reply;
1455 new_req->rq_async_args = request->rq_async_args;
1456 new_req->rq_commit_cb = request->rq_commit_cb;
1457 /* cap resend delay to the current request timeout, this is similar to
1458 * what ptlrpc does (see after_reply()) */
1459 if (aa->aa_resends > new_req->rq_timeout)
1460 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1462 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1463 new_req->rq_generation_set = 1;
1464 new_req->rq_import_generation = request->rq_import_generation;
1466 new_aa = ptlrpc_req_async_args(new_req);
1468 INIT_LIST_HEAD(&new_aa->aa_oaps);
1469 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1470 INIT_LIST_HEAD(&new_aa->aa_exts);
1471 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1472 new_aa->aa_resends = aa->aa_resends;
1474 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1475 if (oap->oap_request) {
1476 ptlrpc_req_finished(oap->oap_request);
1477 oap->oap_request = ptlrpc_request_addref(new_req);
1481 /* XXX: This code will run into problem if we're going to support
1482 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1483 * and wait for all of them to be finished. We should inherit request
1484 * set from old request. */
1485 ptlrpcd_add_req(new_req);
1487 DEBUG_REQ(D_INFO, new_req, "new request");
1492 * ugh, we want disk allocation on the target to happen in offset order. we'll
1493 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1494 * fine for our small page arrays and doesn't require allocation. its an
1495 * insertion sort that swaps elements that are strides apart, shrinking the
1496 * stride down until its '1' and the array is sorted.
1498 static void sort_brw_pages(struct brw_page **array, int num)
1501 struct brw_page *tmp;
1505 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1510 for (i = stride ; i < num ; i++) {
1513 while (j >= stride && array[j - stride]->off > tmp->off) {
1514 array[j] = array[j - stride];
1519 } while (stride > 1);
1522 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1524 LASSERT(ppga != NULL);
1525 OBD_FREE(ppga, sizeof(*ppga) * count);
1528 static int brw_interpret(const struct lu_env *env,
1529 struct ptlrpc_request *req, void *data, int rc)
1531 struct osc_brw_async_args *aa = data;
1532 struct osc_extent *ext;
1533 struct osc_extent *tmp;
1534 struct client_obd *cli = aa->aa_cli;
1537 rc = osc_brw_fini_request(req, rc);
1538 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1539 /* When server return -EINPROGRESS, client should always retry
1540 * regardless of the number of times the bulk was resent already. */
1541 if (osc_recoverable_error(rc)) {
1542 if (req->rq_import_generation !=
1543 req->rq_import->imp_generation) {
1544 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1545 ""DOSTID", rc = %d.\n",
1546 req->rq_import->imp_obd->obd_name,
1547 POSTID(&aa->aa_oa->o_oi), rc);
1548 } else if (rc == -EINPROGRESS ||
1549 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1550 rc = osc_brw_redo_request(req, aa, rc);
1552 CERROR("%s: too many resent retries for object: "
1553 ""LPU64":"LPU64", rc = %d.\n",
1554 req->rq_import->imp_obd->obd_name,
1555 POSTID(&aa->aa_oa->o_oi), rc);
1560 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1565 struct obdo *oa = aa->aa_oa;
1566 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1567 unsigned long valid = 0;
1568 struct cl_object *obj;
1569 struct osc_async_page *last;
1571 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1572 obj = osc2cl(last->oap_obj);
1574 cl_object_attr_lock(obj);
1575 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1576 attr->cat_blocks = oa->o_blocks;
1577 valid |= CAT_BLOCKS;
1579 if (oa->o_valid & OBD_MD_FLMTIME) {
1580 attr->cat_mtime = oa->o_mtime;
1583 if (oa->o_valid & OBD_MD_FLATIME) {
1584 attr->cat_atime = oa->o_atime;
1587 if (oa->o_valid & OBD_MD_FLCTIME) {
1588 attr->cat_ctime = oa->o_ctime;
1592 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1593 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1594 loff_t last_off = last->oap_count + last->oap_obj_off +
1597 /* Change file size if this is an out of quota or
1598 * direct IO write and it extends the file size */
1599 if (loi->loi_lvb.lvb_size < last_off) {
1600 attr->cat_size = last_off;
1603 /* Extend KMS if it's not a lockless write */
1604 if (loi->loi_kms < last_off &&
1605 oap2osc_page(last)->ops_srvlock == 0) {
1606 attr->cat_kms = last_off;
1612 cl_object_attr_update(env, obj, attr, valid);
1613 cl_object_attr_unlock(obj);
1615 OBDO_FREE(aa->aa_oa);
1617 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1618 osc_inc_unstable_pages(req);
1620 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1621 list_del_init(&ext->oe_link);
1622 osc_extent_finish(env, ext, 1, rc);
1624 LASSERT(list_empty(&aa->aa_exts));
1625 LASSERT(list_empty(&aa->aa_oaps));
1627 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1628 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1630 spin_lock(&cli->cl_loi_list_lock);
1631 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1632 * is called so we know whether to go to sync BRWs or wait for more
1633 * RPCs to complete */
1634 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1635 cli->cl_w_in_flight--;
1637 cli->cl_r_in_flight--;
1638 osc_wake_cache_waiters(cli);
1639 spin_unlock(&cli->cl_loi_list_lock);
1641 osc_io_unplug(env, cli, NULL);
1645 static void brw_commit(struct ptlrpc_request *req)
1647 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1648 * this called via the rq_commit_cb, I need to ensure
1649 * osc_dec_unstable_pages is still called. Otherwise unstable
1650 * pages may be leaked. */
1651 spin_lock(&req->rq_lock);
1652 if (likely(req->rq_unstable)) {
1653 req->rq_unstable = 0;
1654 spin_unlock(&req->rq_lock);
1656 osc_dec_unstable_pages(req);
1658 req->rq_committed = 1;
1659 spin_unlock(&req->rq_lock);
1664 * Build an RPC by the list of extent @ext_list. The caller must ensure
1665 * that the total pages in this list are NOT over max pages per RPC.
1666 * Extents in the list must be in OES_RPC state.
1668 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1669 struct list_head *ext_list, int cmd)
1671 struct ptlrpc_request *req = NULL;
1672 struct osc_extent *ext;
1673 struct brw_page **pga = NULL;
1674 struct osc_brw_async_args *aa = NULL;
1675 struct obdo *oa = NULL;
1676 struct osc_async_page *oap;
1677 struct osc_object *obj = NULL;
1678 struct cl_req_attr *crattr = NULL;
1679 loff_t starting_offset = OBD_OBJECT_EOF;
1680 loff_t ending_offset = 0;
1684 bool soft_sync = false;
1685 bool interrupted = false;
1689 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1690 struct ost_body *body;
1692 LASSERT(!list_empty(ext_list));
1694 /* add pages into rpc_list to build BRW rpc */
1695 list_for_each_entry(ext, ext_list, oe_link) {
1696 LASSERT(ext->oe_state == OES_RPC);
1697 mem_tight |= ext->oe_memalloc;
1698 grant += ext->oe_grants;
1699 page_count += ext->oe_nr_pages;
1704 soft_sync = osc_over_unstable_soft_limit(cli);
1706 mpflag = cfs_memory_pressure_get_and_set();
1708 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1710 GOTO(out, rc = -ENOMEM);
1714 GOTO(out, rc = -ENOMEM);
1717 list_for_each_entry(ext, ext_list, oe_link) {
1718 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1720 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1722 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1723 pga[i] = &oap->oap_brw_page;
1724 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1727 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1728 if (starting_offset == OBD_OBJECT_EOF ||
1729 starting_offset > oap->oap_obj_off)
1730 starting_offset = oap->oap_obj_off;
1732 LASSERT(oap->oap_page_off == 0);
1733 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1734 ending_offset = oap->oap_obj_off +
1737 LASSERT(oap->oap_page_off + oap->oap_count ==
1739 if (oap->oap_interrupted)
1744 /* first page in the list */
1745 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1747 crattr = &osc_env_info(env)->oti_req_attr;
1748 memset(crattr, 0, sizeof(*crattr));
1749 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1750 crattr->cra_flags = ~0ULL;
1751 crattr->cra_page = oap2cl_page(oap);
1752 crattr->cra_oa = oa;
1753 cl_req_attr_set(env, osc2cl(obj), crattr);
1755 if (cmd == OBD_BRW_WRITE)
1756 oa->o_grant_used = grant;
1758 sort_brw_pages(pga, page_count);
1759 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1761 CERROR("prep_req failed: %d\n", rc);
1765 req->rq_commit_cb = brw_commit;
1766 req->rq_interpret_reply = brw_interpret;
1767 req->rq_memalloc = mem_tight != 0;
1768 oap->oap_request = ptlrpc_request_addref(req);
1769 if (interrupted && !req->rq_intr)
1770 ptlrpc_mark_interrupted(req);
1772 /* Need to update the timestamps after the request is built in case
1773 * we race with setattr (locally or in queue at OST). If OST gets
1774 * later setattr before earlier BRW (as determined by the request xid),
1775 * the OST will not use BRW timestamps. Sadly, there is no obvious
1776 * way to do this in a single call. bug 10150 */
1777 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1778 crattr->cra_oa = &body->oa;
1779 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1780 cl_req_attr_set(env, osc2cl(obj), crattr);
1781 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1783 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1784 aa = ptlrpc_req_async_args(req);
1785 INIT_LIST_HEAD(&aa->aa_oaps);
1786 list_splice_init(&rpc_list, &aa->aa_oaps);
1787 INIT_LIST_HEAD(&aa->aa_exts);
1788 list_splice_init(ext_list, &aa->aa_exts);
1790 spin_lock(&cli->cl_loi_list_lock);
1791 starting_offset >>= PAGE_CACHE_SHIFT;
1792 if (cmd == OBD_BRW_READ) {
1793 cli->cl_r_in_flight++;
1794 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1795 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1796 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1797 starting_offset + 1);
1799 cli->cl_w_in_flight++;
1800 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1801 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1802 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1803 starting_offset + 1);
1805 spin_unlock(&cli->cl_loi_list_lock);
1807 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1808 page_count, aa, cli->cl_r_in_flight,
1809 cli->cl_w_in_flight);
1811 ptlrpcd_add_req(req);
1817 cfs_memory_pressure_restore(mpflag);
1820 LASSERT(req == NULL);
1825 OBD_FREE(pga, sizeof(*pga) * page_count);
1826 /* this should happen rarely and is pretty bad, it makes the
1827 * pending list not follow the dirty order */
1828 while (!list_empty(ext_list)) {
1829 ext = list_entry(ext_list->next, struct osc_extent,
1831 list_del_init(&ext->oe_link);
1832 osc_extent_finish(env, ext, 0, rc);
1838 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1839 struct ldlm_enqueue_info *einfo)
1841 void *data = einfo->ei_cbdata;
1844 LASSERT(lock != NULL);
1845 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1846 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1847 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1848 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1850 lock_res_and_lock(lock);
1852 if (lock->l_ast_data == NULL)
1853 lock->l_ast_data = data;
1854 if (lock->l_ast_data == data)
1857 unlock_res_and_lock(lock);
1862 static int osc_set_data_with_check(struct lustre_handle *lockh,
1863 struct ldlm_enqueue_info *einfo)
1865 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1869 set = osc_set_lock_data_with_check(lock, einfo);
1870 LDLM_LOCK_PUT(lock);
1872 CERROR("lockh %p, data %p - client evicted?\n",
1873 lockh, einfo->ei_cbdata);
1877 static int osc_enqueue_fini(struct ptlrpc_request *req,
1878 osc_enqueue_upcall_f upcall, void *cookie,
1879 struct lustre_handle *lockh, enum ldlm_mode mode,
1880 __u64 *flags, int agl, int errcode)
1882 bool intent = *flags & LDLM_FL_HAS_INTENT;
1886 /* The request was created before ldlm_cli_enqueue call. */
1887 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1888 struct ldlm_reply *rep;
1890 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1891 LASSERT(rep != NULL);
1893 rep->lock_policy_res1 =
1894 ptlrpc_status_ntoh(rep->lock_policy_res1);
1895 if (rep->lock_policy_res1)
1896 errcode = rep->lock_policy_res1;
1898 *flags |= LDLM_FL_LVB_READY;
1899 } else if (errcode == ELDLM_OK) {
1900 *flags |= LDLM_FL_LVB_READY;
1903 /* Call the update callback. */
1904 rc = (*upcall)(cookie, lockh, errcode);
1906 /* release the reference taken in ldlm_cli_enqueue() */
1907 if (errcode == ELDLM_LOCK_MATCHED)
1909 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1910 ldlm_lock_decref(lockh, mode);
1915 static int osc_enqueue_interpret(const struct lu_env *env,
1916 struct ptlrpc_request *req,
1917 struct osc_enqueue_args *aa, int rc)
1919 struct ldlm_lock *lock;
1920 struct lustre_handle *lockh = &aa->oa_lockh;
1921 enum ldlm_mode mode = aa->oa_mode;
1922 struct ost_lvb *lvb = aa->oa_lvb;
1923 __u32 lvb_len = sizeof(*lvb);
1928 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1930 lock = ldlm_handle2lock(lockh);
1931 LASSERTF(lock != NULL,
1932 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1933 lockh->cookie, req, aa);
1935 /* Take an additional reference so that a blocking AST that
1936 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1937 * to arrive after an upcall has been executed by
1938 * osc_enqueue_fini(). */
1939 ldlm_lock_addref(lockh, mode);
1941 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1942 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1944 /* Let CP AST to grant the lock first. */
1945 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1948 LASSERT(aa->oa_lvb == NULL);
1949 LASSERT(aa->oa_flags == NULL);
1950 aa->oa_flags = &flags;
1953 /* Complete obtaining the lock procedure. */
1954 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1955 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1957 /* Complete osc stuff. */
1958 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1959 aa->oa_flags, aa->oa_agl, rc);
1961 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1963 ldlm_lock_decref(lockh, mode);
1964 LDLM_LOCK_PUT(lock);
1968 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1970 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1971 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1972 * other synchronous requests, however keeping some locks and trying to obtain
1973 * others may take a considerable amount of time in a case of ost failure; and
1974 * when other sync requests do not get released lock from a client, the client
1975 * is evicted from the cluster -- such scenarious make the life difficult, so
1976 * release locks just after they are obtained. */
1977 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1978 __u64 *flags, union ldlm_policy_data *policy,
1979 struct ost_lvb *lvb, int kms_valid,
1980 osc_enqueue_upcall_f upcall, void *cookie,
1981 struct ldlm_enqueue_info *einfo,
1982 struct ptlrpc_request_set *rqset, int async, int agl)
1984 struct obd_device *obd = exp->exp_obd;
1985 struct lustre_handle lockh = { 0 };
1986 struct ptlrpc_request *req = NULL;
1987 int intent = *flags & LDLM_FL_HAS_INTENT;
1988 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1989 enum ldlm_mode mode;
1993 /* Filesystem lock extents are extended to page boundaries so that
1994 * dealing with the page cache is a little smoother. */
1995 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1996 policy->l_extent.end |= ~PAGE_MASK;
1999 * kms is not valid when either object is completely fresh (so that no
2000 * locks are cached), or object was evicted. In the latter case cached
2001 * lock cannot be used, because it would prime inode state with
2002 * potentially stale LVB.
2007 /* Next, search for already existing extent locks that will cover us */
2008 /* If we're trying to read, we also search for an existing PW lock. The
2009 * VFS and page cache already protect us locally, so lots of readers/
2010 * writers can share a single PW lock.
2012 * There are problems with conversion deadlocks, so instead of
2013 * converting a read lock to a write lock, we'll just enqueue a new
2016 * At some point we should cancel the read lock instead of making them
2017 * send us a blocking callback, but there are problems with canceling
2018 * locks out from other users right now, too. */
2019 mode = einfo->ei_mode;
2020 if (einfo->ei_mode == LCK_PR)
2022 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2023 einfo->ei_type, policy, mode, &lockh, 0);
2025 struct ldlm_lock *matched;
2027 if (*flags & LDLM_FL_TEST_LOCK)
2030 matched = ldlm_handle2lock(&lockh);
2032 /* AGL enqueues DLM locks speculatively. Therefore if
2033 * it already exists a DLM lock, it wll just inform the
2034 * caller to cancel the AGL process for this stripe. */
2035 ldlm_lock_decref(&lockh, mode);
2036 LDLM_LOCK_PUT(matched);
2038 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2039 *flags |= LDLM_FL_LVB_READY;
2041 /* We already have a lock, and it's referenced. */
2042 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2044 ldlm_lock_decref(&lockh, mode);
2045 LDLM_LOCK_PUT(matched);
2048 ldlm_lock_decref(&lockh, mode);
2049 LDLM_LOCK_PUT(matched);
2054 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2058 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2059 &RQF_LDLM_ENQUEUE_LVB);
2063 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2065 ptlrpc_request_free(req);
2069 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2071 ptlrpc_request_set_replen(req);
2074 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2075 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2077 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2078 sizeof(*lvb), LVB_T_OST, &lockh, async);
2081 struct osc_enqueue_args *aa;
2082 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2083 aa = ptlrpc_req_async_args(req);
2085 aa->oa_mode = einfo->ei_mode;
2086 aa->oa_type = einfo->ei_type;
2087 lustre_handle_copy(&aa->oa_lockh, &lockh);
2088 aa->oa_upcall = upcall;
2089 aa->oa_cookie = cookie;
2092 aa->oa_flags = flags;
2095 /* AGL is essentially to enqueue an DLM lock
2096 * in advance, so we don't care about the
2097 * result of AGL enqueue. */
2099 aa->oa_flags = NULL;
2102 req->rq_interpret_reply =
2103 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2104 if (rqset == PTLRPCD_SET)
2105 ptlrpcd_add_req(req);
2107 ptlrpc_set_add_req(rqset, req);
2108 } else if (intent) {
2109 ptlrpc_req_finished(req);
2114 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2117 ptlrpc_req_finished(req);
2122 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2123 enum ldlm_type type, union ldlm_policy_data *policy,
2124 enum ldlm_mode mode, __u64 *flags, void *data,
2125 struct lustre_handle *lockh, int unref)
2127 struct obd_device *obd = exp->exp_obd;
2128 __u64 lflags = *flags;
2132 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2135 /* Filesystem lock extents are extended to page boundaries so that
2136 * dealing with the page cache is a little smoother */
2137 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2138 policy->l_extent.end |= ~PAGE_MASK;
2140 /* Next, search for already existing extent locks that will cover us */
2141 /* If we're trying to read, we also search for an existing PW lock. The
2142 * VFS and page cache already protect us locally, so lots of readers/
2143 * writers can share a single PW lock. */
2147 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2148 res_id, type, policy, rc, lockh, unref);
2151 if (!osc_set_data_with_check(lockh, data)) {
2152 if (!(lflags & LDLM_FL_TEST_LOCK))
2153 ldlm_lock_decref(lockh, rc);
2157 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2158 ldlm_lock_addref(lockh, LCK_PR);
2159 ldlm_lock_decref(lockh, LCK_PW);
2166 static int osc_statfs_interpret(const struct lu_env *env,
2167 struct ptlrpc_request *req,
2168 struct osc_async_args *aa, int rc)
2170 struct obd_statfs *msfs;
2174 /* The request has in fact never been sent
2175 * due to issues at a higher level (LOV).
2176 * Exit immediately since the caller is
2177 * aware of the problem and takes care
2178 * of the clean up */
2181 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2182 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2188 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2190 GOTO(out, rc = -EPROTO);
2193 *aa->aa_oi->oi_osfs = *msfs;
2195 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2199 static int osc_statfs_async(struct obd_export *exp,
2200 struct obd_info *oinfo, __u64 max_age,
2201 struct ptlrpc_request_set *rqset)
2203 struct obd_device *obd = class_exp2obd(exp);
2204 struct ptlrpc_request *req;
2205 struct osc_async_args *aa;
2209 /* We could possibly pass max_age in the request (as an absolute
2210 * timestamp or a "seconds.usec ago") so the target can avoid doing
2211 * extra calls into the filesystem if that isn't necessary (e.g.
2212 * during mount that would help a bit). Having relative timestamps
2213 * is not so great if request processing is slow, while absolute
2214 * timestamps are not ideal because they need time synchronization. */
2215 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2219 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2221 ptlrpc_request_free(req);
2224 ptlrpc_request_set_replen(req);
2225 req->rq_request_portal = OST_CREATE_PORTAL;
2226 ptlrpc_at_set_req_timeout(req);
2228 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2229 /* procfs requests not want stat in wait for avoid deadlock */
2230 req->rq_no_resend = 1;
2231 req->rq_no_delay = 1;
2234 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2235 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2236 aa = ptlrpc_req_async_args(req);
2239 ptlrpc_set_add_req(rqset, req);
2243 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2244 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2246 struct obd_device *obd = class_exp2obd(exp);
2247 struct obd_statfs *msfs;
2248 struct ptlrpc_request *req;
2249 struct obd_import *imp = NULL;
2253 /*Since the request might also come from lprocfs, so we need
2254 *sync this with client_disconnect_export Bug15684*/
2255 down_read(&obd->u.cli.cl_sem);
2256 if (obd->u.cli.cl_import)
2257 imp = class_import_get(obd->u.cli.cl_import);
2258 up_read(&obd->u.cli.cl_sem);
2262 /* We could possibly pass max_age in the request (as an absolute
2263 * timestamp or a "seconds.usec ago") so the target can avoid doing
2264 * extra calls into the filesystem if that isn't necessary (e.g.
2265 * during mount that would help a bit). Having relative timestamps
2266 * is not so great if request processing is slow, while absolute
2267 * timestamps are not ideal because they need time synchronization. */
2268 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2270 class_import_put(imp);
2275 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2277 ptlrpc_request_free(req);
2280 ptlrpc_request_set_replen(req);
2281 req->rq_request_portal = OST_CREATE_PORTAL;
2282 ptlrpc_at_set_req_timeout(req);
2284 if (flags & OBD_STATFS_NODELAY) {
2285 /* procfs requests not want stat in wait for avoid deadlock */
2286 req->rq_no_resend = 1;
2287 req->rq_no_delay = 1;
2290 rc = ptlrpc_queue_wait(req);
2294 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2296 GOTO(out, rc = -EPROTO);
2303 ptlrpc_req_finished(req);
2307 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2308 void *karg, void __user *uarg)
2310 struct obd_device *obd = exp->exp_obd;
2311 struct obd_ioctl_data *data = karg;
2315 if (!try_module_get(THIS_MODULE)) {
2316 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2317 module_name(THIS_MODULE));
2321 case OBD_IOC_CLIENT_RECOVER:
2322 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2323 data->ioc_inlbuf1, 0);
2327 case IOC_OSC_SET_ACTIVE:
2328 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2331 case OBD_IOC_PING_TARGET:
2332 err = ptlrpc_obd_ping(obd);
2335 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2336 cmd, current_comm());
2337 GOTO(out, err = -ENOTTY);
2340 module_put(THIS_MODULE);
2344 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2345 u32 keylen, void *key,
2346 u32 vallen, void *val,
2347 struct ptlrpc_request_set *set)
2349 struct ptlrpc_request *req;
2350 struct obd_device *obd = exp->exp_obd;
2351 struct obd_import *imp = class_exp2cliimp(exp);
2356 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2358 if (KEY_IS(KEY_CHECKSUM)) {
2359 if (vallen != sizeof(int))
2361 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2365 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2366 sptlrpc_conf_client_adapt(obd);
2370 if (KEY_IS(KEY_FLUSH_CTX)) {
2371 sptlrpc_import_flush_my_ctx(imp);
2375 if (KEY_IS(KEY_CACHE_SET)) {
2376 struct client_obd *cli = &obd->u.cli;
2378 LASSERT(cli->cl_cache == NULL); /* only once */
2379 cli->cl_cache = (struct cl_client_cache *)val;
2380 cl_cache_incref(cli->cl_cache);
2381 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2383 /* add this osc into entity list */
2384 LASSERT(list_empty(&cli->cl_lru_osc));
2385 spin_lock(&cli->cl_cache->ccc_lru_lock);
2386 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2387 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2392 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2393 struct client_obd *cli = &obd->u.cli;
2394 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2395 long target = *(long *)val;
2397 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2402 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2405 /* We pass all other commands directly to OST. Since nobody calls osc
2406 methods directly and everybody is supposed to go through LOV, we
2407 assume lov checked invalid values for us.
2408 The only recognised values so far are evict_by_nid and mds_conn.
2409 Even if something bad goes through, we'd get a -EINVAL from OST
2412 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2413 &RQF_OST_SET_GRANT_INFO :
2418 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2419 RCL_CLIENT, keylen);
2420 if (!KEY_IS(KEY_GRANT_SHRINK))
2421 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2422 RCL_CLIENT, vallen);
2423 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2425 ptlrpc_request_free(req);
2429 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2430 memcpy(tmp, key, keylen);
2431 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2434 memcpy(tmp, val, vallen);
2436 if (KEY_IS(KEY_GRANT_SHRINK)) {
2437 struct osc_grant_args *aa;
2440 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2441 aa = ptlrpc_req_async_args(req);
2444 ptlrpc_req_finished(req);
2447 *oa = ((struct ost_body *)val)->oa;
2449 req->rq_interpret_reply = osc_shrink_grant_interpret;
2452 ptlrpc_request_set_replen(req);
2453 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2454 LASSERT(set != NULL);
2455 ptlrpc_set_add_req(set, req);
2456 ptlrpc_check_set(NULL, set);
2458 ptlrpcd_add_req(req);
2464 static int osc_reconnect(const struct lu_env *env,
2465 struct obd_export *exp, struct obd_device *obd,
2466 struct obd_uuid *cluuid,
2467 struct obd_connect_data *data,
2470 struct client_obd *cli = &obd->u.cli;
2472 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2476 spin_lock(&cli->cl_loi_list_lock);
2477 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2478 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2479 grant += cli->cl_dirty_grant;
2481 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2482 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2483 lost_grant = cli->cl_lost_grant;
2484 cli->cl_lost_grant = 0;
2485 spin_unlock(&cli->cl_loi_list_lock);
2487 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2488 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2489 data->ocd_version, data->ocd_grant, lost_grant);
2495 static int osc_disconnect(struct obd_export *exp)
2497 struct obd_device *obd = class_exp2obd(exp);
2500 rc = client_disconnect_export(exp);
2502 * Initially we put del_shrink_grant before disconnect_export, but it
2503 * causes the following problem if setup (connect) and cleanup
2504 * (disconnect) are tangled together.
2505 * connect p1 disconnect p2
2506 * ptlrpc_connect_import
2507 * ............... class_manual_cleanup
2510 * ptlrpc_connect_interrupt
2512 * add this client to shrink list
2514 * Bang! pinger trigger the shrink.
2515 * So the osc should be disconnected from the shrink list, after we
2516 * are sure the import has been destroyed. BUG18662
2518 if (obd->u.cli.cl_import == NULL)
2519 osc_del_shrink_grant(&obd->u.cli);
2523 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2524 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2526 struct lu_env *env = arg;
2527 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2528 struct ldlm_lock *lock;
2529 struct osc_object *osc = NULL;
2533 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2534 if (lock->l_ast_data != NULL && osc == NULL) {
2535 osc = lock->l_ast_data;
2536 cl_object_get(osc2cl(osc));
2539 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2540 * by the 2nd round of ldlm_namespace_clean() call in
2541 * osc_import_event(). */
2542 ldlm_clear_cleaned(lock);
2547 osc_object_invalidate(env, osc);
2548 cl_object_put(env, osc2cl(osc));
2554 static int osc_import_event(struct obd_device *obd,
2555 struct obd_import *imp,
2556 enum obd_import_event event)
2558 struct client_obd *cli;
2562 LASSERT(imp->imp_obd == obd);
2565 case IMP_EVENT_DISCON: {
2567 spin_lock(&cli->cl_loi_list_lock);
2568 cli->cl_avail_grant = 0;
2569 cli->cl_lost_grant = 0;
2570 spin_unlock(&cli->cl_loi_list_lock);
2573 case IMP_EVENT_INACTIVE: {
2574 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2577 case IMP_EVENT_INVALIDATE: {
2578 struct ldlm_namespace *ns = obd->obd_namespace;
2582 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2584 env = cl_env_get(&refcheck);
2586 osc_io_unplug(env, &obd->u.cli, NULL);
2588 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2589 osc_ldlm_resource_invalidate,
2591 cl_env_put(env, &refcheck);
2593 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2598 case IMP_EVENT_ACTIVE: {
2599 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2602 case IMP_EVENT_OCD: {
2603 struct obd_connect_data *ocd = &imp->imp_connect_data;
2605 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2606 osc_init_grant(&obd->u.cli, ocd);
2609 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2610 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2612 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2615 case IMP_EVENT_DEACTIVATE: {
2616 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2619 case IMP_EVENT_ACTIVATE: {
2620 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2624 CERROR("Unknown import event %d\n", event);
2631 * Determine whether the lock can be canceled before replaying the lock
2632 * during recovery, see bug16774 for detailed information.
2634 * \retval zero the lock can't be canceled
2635 * \retval other ok to cancel
2637 static int osc_cancel_weight(struct ldlm_lock *lock)
2640 * Cancel all unused and granted extent lock.
2642 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2643 lock->l_granted_mode == lock->l_req_mode &&
2644 osc_ldlm_weigh_ast(lock) == 0)
2650 static int brw_queue_work(const struct lu_env *env, void *data)
2652 struct client_obd *cli = data;
2654 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2656 osc_io_unplug(env, cli, NULL);
2660 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2662 struct client_obd *cli = &obd->u.cli;
2663 struct obd_type *type;
2671 rc = ptlrpcd_addref();
2675 rc = client_obd_setup(obd, lcfg);
2677 GOTO(out_ptlrpcd, rc);
2679 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2680 if (IS_ERR(handler))
2681 GOTO(out_client_setup, rc = PTR_ERR(handler));
2682 cli->cl_writeback_work = handler;
2684 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2685 if (IS_ERR(handler))
2686 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2687 cli->cl_lru_work = handler;
2689 rc = osc_quota_setup(obd);
2691 GOTO(out_ptlrpcd_work, rc);
2693 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2695 #ifdef CONFIG_PROC_FS
2696 obd->obd_vars = lprocfs_osc_obd_vars;
2698 /* If this is true then both client (osc) and server (osp) are on the
2699 * same node. The osp layer if loaded first will register the osc proc
2700 * directory. In that case this obd_device will be attached its proc
2701 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2702 type = class_search_type(LUSTRE_OSP_NAME);
2703 if (type && type->typ_procsym) {
2704 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2706 obd->obd_vars, obd);
2707 if (IS_ERR(obd->obd_proc_entry)) {
2708 rc = PTR_ERR(obd->obd_proc_entry);
2709 CERROR("error %d setting up lprocfs for %s\n", rc,
2711 obd->obd_proc_entry = NULL;
2714 rc = lprocfs_obd_setup(obd);
2717 /* If the basic OSC proc tree construction succeeded then
2718 * lets do the rest. */
2720 lproc_osc_attach_seqstat(obd);
2721 sptlrpc_lprocfs_cliobd_attach(obd);
2722 ptlrpc_lprocfs_register_obd(obd);
2726 * We try to control the total number of requests with a upper limit
2727 * osc_reqpool_maxreqcount. There might be some race which will cause
2728 * over-limit allocation, but it is fine.
2730 req_count = atomic_read(&osc_pool_req_count);
2731 if (req_count < osc_reqpool_maxreqcount) {
2732 adding = cli->cl_max_rpcs_in_flight + 2;
2733 if (req_count + adding > osc_reqpool_maxreqcount)
2734 adding = osc_reqpool_maxreqcount - req_count;
2736 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2737 atomic_add(added, &osc_pool_req_count);
2740 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2741 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2743 spin_lock(&osc_shrink_lock);
2744 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2745 spin_unlock(&osc_shrink_lock);
2750 if (cli->cl_writeback_work != NULL) {
2751 ptlrpcd_destroy_work(cli->cl_writeback_work);
2752 cli->cl_writeback_work = NULL;
2754 if (cli->cl_lru_work != NULL) {
2755 ptlrpcd_destroy_work(cli->cl_lru_work);
2756 cli->cl_lru_work = NULL;
2759 client_obd_cleanup(obd);
2765 static int osc_precleanup(struct obd_device *obd)
2767 struct client_obd *cli = &obd->u.cli;
2771 * for echo client, export may be on zombie list, wait for
2772 * zombie thread to cull it, because cli.cl_import will be
2773 * cleared in client_disconnect_export():
2774 * class_export_destroy() -> obd_cleanup() ->
2775 * echo_device_free() -> echo_client_cleanup() ->
2776 * obd_disconnect() -> osc_disconnect() ->
2777 * client_disconnect_export()
2779 obd_zombie_barrier();
2780 if (cli->cl_writeback_work) {
2781 ptlrpcd_destroy_work(cli->cl_writeback_work);
2782 cli->cl_writeback_work = NULL;
2785 if (cli->cl_lru_work) {
2786 ptlrpcd_destroy_work(cli->cl_lru_work);
2787 cli->cl_lru_work = NULL;
2790 obd_cleanup_client_import(obd);
2791 ptlrpc_lprocfs_unregister_obd(obd);
2792 lprocfs_obd_cleanup(obd);
2796 int osc_cleanup(struct obd_device *obd)
2798 struct client_obd *cli = &obd->u.cli;
2803 spin_lock(&osc_shrink_lock);
2804 list_del(&cli->cl_shrink_list);
2805 spin_unlock(&osc_shrink_lock);
2808 if (cli->cl_cache != NULL) {
2809 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2810 spin_lock(&cli->cl_cache->ccc_lru_lock);
2811 list_del_init(&cli->cl_lru_osc);
2812 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2813 cli->cl_lru_left = NULL;
2814 cl_cache_decref(cli->cl_cache);
2815 cli->cl_cache = NULL;
2818 /* free memory of osc quota cache */
2819 osc_quota_cleanup(obd);
2821 rc = client_obd_cleanup(obd);
2827 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2829 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2830 return rc > 0 ? 0: rc;
2833 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2835 return osc_process_config_base(obd, buf);
2838 static struct obd_ops osc_obd_ops = {
2839 .o_owner = THIS_MODULE,
2840 .o_setup = osc_setup,
2841 .o_precleanup = osc_precleanup,
2842 .o_cleanup = osc_cleanup,
2843 .o_add_conn = client_import_add_conn,
2844 .o_del_conn = client_import_del_conn,
2845 .o_connect = client_connect_import,
2846 .o_reconnect = osc_reconnect,
2847 .o_disconnect = osc_disconnect,
2848 .o_statfs = osc_statfs,
2849 .o_statfs_async = osc_statfs_async,
2850 .o_create = osc_create,
2851 .o_destroy = osc_destroy,
2852 .o_getattr = osc_getattr,
2853 .o_setattr = osc_setattr,
2854 .o_iocontrol = osc_iocontrol,
2855 .o_set_info_async = osc_set_info_async,
2856 .o_import_event = osc_import_event,
2857 .o_process_config = osc_process_config,
2858 .o_quotactl = osc_quotactl,
2861 static struct shrinker *osc_cache_shrinker;
2862 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2863 DEFINE_SPINLOCK(osc_shrink_lock);
2865 #ifndef HAVE_SHRINKER_COUNT
2866 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2868 struct shrink_control scv = {
2869 .nr_to_scan = shrink_param(sc, nr_to_scan),
2870 .gfp_mask = shrink_param(sc, gfp_mask)
2872 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2873 struct shrinker *shrinker = NULL;
2876 (void)osc_cache_shrink_scan(shrinker, &scv);
2878 return osc_cache_shrink_count(shrinker, &scv);
2882 static int __init osc_init(void)
2884 bool enable_proc = true;
2885 struct obd_type *type;
2886 unsigned int reqpool_size;
2887 unsigned int reqsize;
2889 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2890 osc_cache_shrink_count, osc_cache_shrink_scan);
2893 /* print an address of _any_ initialized kernel symbol from this
2894 * module, to allow debugging with gdb that doesn't support data
2895 * symbols from modules.*/
2896 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2898 rc = lu_kmem_init(osc_caches);
2902 type = class_search_type(LUSTRE_OSP_NAME);
2903 if (type != NULL && type->typ_procsym != NULL)
2904 enable_proc = false;
2906 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2907 LUSTRE_OSC_NAME, &osc_device_type);
2911 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2913 /* This is obviously too much memory, only prevent overflow here */
2914 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2915 GOTO(out_type, rc = -EINVAL);
2917 reqpool_size = osc_reqpool_mem_max << 20;
2920 while (reqsize < OST_IO_MAXREQSIZE)
2921 reqsize = reqsize << 1;
2924 * We don't enlarge the request count in OSC pool according to
2925 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2926 * tried after normal allocation failed. So a small OSC pool won't
2927 * cause much performance degression in most of cases.
2929 osc_reqpool_maxreqcount = reqpool_size / reqsize;
2931 atomic_set(&osc_pool_req_count, 0);
2932 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2933 ptlrpc_add_rqs_to_pool);
2935 if (osc_rq_pool != NULL)
2939 class_unregister_type(LUSTRE_OSC_NAME);
2941 lu_kmem_fini(osc_caches);
2946 static void __exit osc_exit(void)
2948 remove_shrinker(osc_cache_shrinker);
2949 class_unregister_type(LUSTRE_OSC_NAME);
2950 lu_kmem_fini(osc_caches);
2951 ptlrpc_free_rq_pool(osc_rq_pool);
2954 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2955 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2956 MODULE_VERSION(LUSTRE_VERSION_STRING);
2957 MODULE_LICENSE("GPL");
2959 module_init(osc_init);
2960 module_exit(osc_exit);