4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
65 struct osc_brw_async_args {
71 struct brw_page **aa_ppga;
72 struct client_obd *aa_cli;
73 struct list_head aa_oaps;
74 struct list_head aa_exts;
77 #define osc_grant_args osc_brw_async_args
79 struct osc_setattr_args {
81 obd_enqueue_update_f sa_upcall;
85 struct osc_fsync_args {
86 struct osc_object *fa_obj;
88 obd_enqueue_update_f fa_upcall;
92 struct osc_enqueue_args {
93 struct obd_export *oa_exp;
94 enum ldlm_type oa_type;
95 enum ldlm_mode oa_mode;
97 osc_enqueue_upcall_f oa_upcall;
99 struct ost_lvb *oa_lvb;
100 struct lustre_handle oa_lockh;
101 unsigned int oa_agl:1;
104 static void osc_release_ppga(struct brw_page **ppga, size_t count);
105 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
108 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
110 struct ost_body *body;
112 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
115 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
118 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
121 struct ptlrpc_request *req;
122 struct ost_body *body;
126 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
130 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
132 ptlrpc_request_free(req);
136 osc_pack_req_body(req, oa);
138 ptlrpc_request_set_replen(req);
140 rc = ptlrpc_queue_wait(req);
144 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
146 GOTO(out, rc = -EPROTO);
148 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
149 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
151 oa->o_blksize = cli_brw_size(exp->exp_obd);
152 oa->o_valid |= OBD_MD_FLBLKSZ;
156 ptlrpc_req_finished(req);
161 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
164 struct ptlrpc_request *req;
165 struct ost_body *body;
169 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
171 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
175 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
177 ptlrpc_request_free(req);
181 osc_pack_req_body(req, oa);
183 ptlrpc_request_set_replen(req);
185 rc = ptlrpc_queue_wait(req);
189 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
191 GOTO(out, rc = -EPROTO);
193 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
197 ptlrpc_req_finished(req);
202 static int osc_setattr_interpret(const struct lu_env *env,
203 struct ptlrpc_request *req,
204 struct osc_setattr_args *sa, int rc)
206 struct ost_body *body;
212 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214 GOTO(out, rc = -EPROTO);
216 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
219 rc = sa->sa_upcall(sa->sa_cookie, rc);
223 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
224 obd_enqueue_update_f upcall, void *cookie,
225 struct ptlrpc_request_set *rqset)
227 struct ptlrpc_request *req;
228 struct osc_setattr_args *sa;
233 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
237 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
239 ptlrpc_request_free(req);
243 osc_pack_req_body(req, oa);
245 ptlrpc_request_set_replen(req);
247 /* do mds to ost setattr asynchronously */
249 /* Do not wait for response. */
250 ptlrpcd_add_req(req);
252 req->rq_interpret_reply =
253 (ptlrpc_interpterer_t)osc_setattr_interpret;
255 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
256 sa = ptlrpc_req_async_args(req);
258 sa->sa_upcall = upcall;
259 sa->sa_cookie = cookie;
261 if (rqset == PTLRPCD_SET)
262 ptlrpcd_add_req(req);
264 ptlrpc_set_add_req(rqset, req);
270 static int osc_create(const struct lu_env *env, struct obd_export *exp,
273 struct ptlrpc_request *req;
274 struct ost_body *body;
279 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
280 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
282 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
284 GOTO(out, rc = -ENOMEM);
286 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
288 ptlrpc_request_free(req);
292 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
295 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
297 ptlrpc_request_set_replen(req);
299 rc = ptlrpc_queue_wait(req);
303 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
305 GOTO(out_req, rc = -EPROTO);
307 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
308 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
310 oa->o_blksize = cli_brw_size(exp->exp_obd);
311 oa->o_valid |= OBD_MD_FLBLKSZ;
313 CDEBUG(D_HA, "transno: "LPD64"\n",
314 lustre_msg_get_transno(req->rq_repmsg));
316 ptlrpc_req_finished(req);
321 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
322 obd_enqueue_update_f upcall, void *cookie,
323 struct ptlrpc_request_set *rqset)
325 struct ptlrpc_request *req;
326 struct osc_setattr_args *sa;
327 struct ost_body *body;
331 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
335 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
337 ptlrpc_request_free(req);
340 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
341 ptlrpc_at_set_req_timeout(req);
343 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
345 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347 ptlrpc_request_set_replen(req);
349 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
350 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
351 sa = ptlrpc_req_async_args(req);
353 sa->sa_upcall = upcall;
354 sa->sa_cookie = cookie;
355 if (rqset == PTLRPCD_SET)
356 ptlrpcd_add_req(req);
358 ptlrpc_set_add_req(rqset, req);
363 static int osc_sync_interpret(const struct lu_env *env,
364 struct ptlrpc_request *req,
367 struct osc_fsync_args *fa = arg;
368 struct ost_body *body;
369 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
370 unsigned long valid = 0;
371 struct cl_object *obj;
377 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
379 CERROR("can't unpack ost_body\n");
380 GOTO(out, rc = -EPROTO);
383 *fa->fa_oa = body->oa;
384 obj = osc2cl(fa->fa_obj);
386 /* Update osc object's blocks attribute */
387 cl_object_attr_lock(obj);
388 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
389 attr->cat_blocks = body->oa.o_blocks;
394 cl_object_attr_update(env, obj, attr, valid);
395 cl_object_attr_unlock(obj);
398 rc = fa->fa_upcall(fa->fa_cookie, rc);
402 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
403 obd_enqueue_update_f upcall, void *cookie,
404 struct ptlrpc_request_set *rqset)
406 struct obd_export *exp = osc_export(obj);
407 struct ptlrpc_request *req;
408 struct ost_body *body;
409 struct osc_fsync_args *fa;
413 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
417 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
419 ptlrpc_request_free(req);
423 /* overload the size and blocks fields in the oa with start/end */
424 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
426 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
428 ptlrpc_request_set_replen(req);
429 req->rq_interpret_reply = osc_sync_interpret;
431 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
432 fa = ptlrpc_req_async_args(req);
435 fa->fa_upcall = upcall;
436 fa->fa_cookie = cookie;
438 if (rqset == PTLRPCD_SET)
439 ptlrpcd_add_req(req);
441 ptlrpc_set_add_req(rqset, req);
446 /* Find and cancel locally locks matched by @mode in the resource found by
447 * @objid. Found locks are added into @cancel list. Returns the amount of
448 * locks added to @cancels list. */
449 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
450 struct list_head *cancels,
451 enum ldlm_mode mode, __u64 lock_flags)
453 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
454 struct ldlm_res_id res_id;
455 struct ldlm_resource *res;
459 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
460 * export) but disabled through procfs (flag in NS).
462 * This distinguishes from a case when ELC is not supported originally,
463 * when we still want to cancel locks in advance and just cancel them
464 * locally, without sending any RPC. */
465 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
468 ostid_build_res_name(&oa->o_oi, &res_id);
469 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
473 LDLM_RESOURCE_ADDREF(res);
474 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
475 lock_flags, 0, NULL);
476 LDLM_RESOURCE_DELREF(res);
477 ldlm_resource_putref(res);
481 static int osc_destroy_interpret(const struct lu_env *env,
482 struct ptlrpc_request *req, void *data,
485 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
487 atomic_dec(&cli->cl_destroy_in_flight);
488 wake_up(&cli->cl_destroy_waitq);
492 static int osc_can_send_destroy(struct client_obd *cli)
494 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
495 cli->cl_max_rpcs_in_flight) {
496 /* The destroy request can be sent */
499 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
500 cli->cl_max_rpcs_in_flight) {
502 * The counter has been modified between the two atomic
505 wake_up(&cli->cl_destroy_waitq);
510 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
513 struct client_obd *cli = &exp->exp_obd->u.cli;
514 struct ptlrpc_request *req;
515 struct ost_body *body;
516 struct list_head cancels = LIST_HEAD_INIT(cancels);
521 CDEBUG(D_INFO, "oa NULL\n");
525 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
526 LDLM_FL_DISCARD_DATA);
528 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
530 ldlm_lock_list_put(&cancels, l_bl_ast, count);
534 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
537 ptlrpc_request_free(req);
541 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
542 ptlrpc_at_set_req_timeout(req);
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
548 ptlrpc_request_set_replen(req);
550 req->rq_interpret_reply = osc_destroy_interpret;
551 if (!osc_can_send_destroy(cli)) {
552 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
555 * Wait until the number of on-going destroy RPCs drops
556 * under max_rpc_in_flight
558 l_wait_event_exclusive(cli->cl_destroy_waitq,
559 osc_can_send_destroy(cli), &lwi);
562 /* Do not wait for response */
563 ptlrpcd_add_req(req);
567 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
570 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
572 LASSERT(!(oa->o_valid & bits));
575 spin_lock(&cli->cl_loi_list_lock);
576 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
577 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
578 cli->cl_dirty_max_pages)) {
579 CERROR("dirty %lu - %lu > dirty_max %lu\n",
580 cli->cl_dirty_pages, cli->cl_dirty_transit,
581 cli->cl_dirty_max_pages);
583 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
584 atomic_long_read(&obd_dirty_transit_pages) >
585 (long)(obd_max_dirty_pages + 1))) {
586 /* The atomic_read() allowing the atomic_inc() are
587 * not covered by a lock thus they may safely race and trip
588 * this CERROR() unless we add in a small fudge factor (+1). */
589 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
590 cli_name(cli), atomic_long_read(&obd_dirty_pages),
591 atomic_long_read(&obd_dirty_transit_pages),
592 obd_max_dirty_pages);
594 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
596 CERROR("dirty %lu - dirty_max %lu too big???\n",
597 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
600 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
602 (cli->cl_max_rpcs_in_flight + 1);
603 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
606 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
607 oa->o_dropped = cli->cl_lost_grant;
608 cli->cl_lost_grant = 0;
609 spin_unlock(&cli->cl_loi_list_lock);
610 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
611 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
615 void osc_update_next_shrink(struct client_obd *cli)
617 cli->cl_next_shrink_grant =
618 cfs_time_shift(cli->cl_grant_shrink_interval);
619 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
620 cli->cl_next_shrink_grant);
623 static void __osc_update_grant(struct client_obd *cli, u64 grant)
625 spin_lock(&cli->cl_loi_list_lock);
626 cli->cl_avail_grant += grant;
627 spin_unlock(&cli->cl_loi_list_lock);
630 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
632 if (body->oa.o_valid & OBD_MD_FLGRANT) {
633 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
634 __osc_update_grant(cli, body->oa.o_grant);
638 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
639 u32 keylen, void *key,
640 u32 vallen, void *val,
641 struct ptlrpc_request_set *set);
643 static int osc_shrink_grant_interpret(const struct lu_env *env,
644 struct ptlrpc_request *req,
647 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
648 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
649 struct ost_body *body;
652 __osc_update_grant(cli, oa->o_grant);
656 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
658 osc_update_grant(cli, body);
664 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
666 spin_lock(&cli->cl_loi_list_lock);
667 oa->o_grant = cli->cl_avail_grant / 4;
668 cli->cl_avail_grant -= oa->o_grant;
669 spin_unlock(&cli->cl_loi_list_lock);
670 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
671 oa->o_valid |= OBD_MD_FLFLAGS;
674 oa->o_flags |= OBD_FL_SHRINK_GRANT;
675 osc_update_next_shrink(cli);
678 /* Shrink the current grant, either from some large amount to enough for a
679 * full set of in-flight RPCs, or if we have already shrunk to that limit
680 * then to enough for a single RPC. This avoids keeping more grant than
681 * needed, and avoids shrinking the grant piecemeal. */
682 static int osc_shrink_grant(struct client_obd *cli)
684 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
685 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
687 spin_lock(&cli->cl_loi_list_lock);
688 if (cli->cl_avail_grant <= target_bytes)
689 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
690 spin_unlock(&cli->cl_loi_list_lock);
692 return osc_shrink_grant_to_target(cli, target_bytes);
695 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
698 struct ost_body *body;
701 spin_lock(&cli->cl_loi_list_lock);
702 /* Don't shrink if we are already above or below the desired limit
703 * We don't want to shrink below a single RPC, as that will negatively
704 * impact block allocation and long-term performance. */
705 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
706 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
708 if (target_bytes >= cli->cl_avail_grant) {
709 spin_unlock(&cli->cl_loi_list_lock);
712 spin_unlock(&cli->cl_loi_list_lock);
718 osc_announce_cached(cli, &body->oa, 0);
720 spin_lock(&cli->cl_loi_list_lock);
721 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
722 cli->cl_avail_grant = target_bytes;
723 spin_unlock(&cli->cl_loi_list_lock);
724 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
725 body->oa.o_valid |= OBD_MD_FLFLAGS;
726 body->oa.o_flags = 0;
728 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
729 osc_update_next_shrink(cli);
731 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
732 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
733 sizeof(*body), body, NULL);
735 __osc_update_grant(cli, body->oa.o_grant);
740 static int osc_should_shrink_grant(struct client_obd *client)
742 cfs_time_t time = cfs_time_current();
743 cfs_time_t next_shrink = client->cl_next_shrink_grant;
745 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
746 OBD_CONNECT_GRANT_SHRINK) == 0)
749 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
750 /* Get the current RPC size directly, instead of going via:
751 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
752 * Keep comment here so that it can be found by searching. */
753 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
755 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
756 client->cl_avail_grant > brw_size)
759 osc_update_next_shrink(client);
764 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
766 struct client_obd *client;
768 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
769 if (osc_should_shrink_grant(client))
770 osc_shrink_grant(client);
775 static int osc_add_shrink_grant(struct client_obd *client)
779 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
781 osc_grant_shrink_grant_cb, NULL,
782 &client->cl_grant_shrink_list);
784 CERROR("add grant client %s error %d\n", cli_name(client), rc);
787 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
788 osc_update_next_shrink(client);
792 static int osc_del_shrink_grant(struct client_obd *client)
794 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
798 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
801 * ocd_grant is the total grant amount we're expect to hold: if we've
802 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
803 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
806 * race is tolerable here: if we're evicted, but imp_state already
807 * left EVICTED state, then cl_dirty_pages must be 0 already.
809 spin_lock(&cli->cl_loi_list_lock);
810 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
811 cli->cl_avail_grant = ocd->ocd_grant;
813 cli->cl_avail_grant = ocd->ocd_grant -
814 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
816 if (cli->cl_avail_grant < 0) {
817 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
818 cli_name(cli), cli->cl_avail_grant,
819 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
820 /* workaround for servers which do not have the patch from
822 cli->cl_avail_grant = ocd->ocd_grant;
825 /* determine the appropriate chunk size used by osc_extent. */
826 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
827 spin_unlock(&cli->cl_loi_list_lock);
829 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
830 "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
831 cli->cl_lost_grant, cli->cl_chunkbits);
833 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
834 list_empty(&cli->cl_grant_shrink_list))
835 osc_add_shrink_grant(cli);
838 /* We assume that the reason this OSC got a short read is because it read
839 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
840 * via the LOV, and it _knows_ it's reading inside the file, it's just that
841 * this stripe never got written at or beyond this stripe offset yet. */
842 static void handle_short_read(int nob_read, size_t page_count,
843 struct brw_page **pga)
848 /* skip bytes read OK */
849 while (nob_read > 0) {
850 LASSERT (page_count > 0);
852 if (pga[i]->count > nob_read) {
853 /* EOF inside this page */
854 ptr = kmap(pga[i]->pg) +
855 (pga[i]->off & ~PAGE_MASK);
856 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
863 nob_read -= pga[i]->count;
868 /* zero remaining pages */
869 while (page_count-- > 0) {
870 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
871 memset(ptr, 0, pga[i]->count);
877 static int check_write_rcs(struct ptlrpc_request *req,
878 int requested_nob, int niocount,
879 size_t page_count, struct brw_page **pga)
884 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
885 sizeof(*remote_rcs) *
887 if (remote_rcs == NULL) {
888 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
892 /* return error if any niobuf was in error */
893 for (i = 0; i < niocount; i++) {
894 if ((int)remote_rcs[i] < 0)
895 return(remote_rcs[i]);
897 if (remote_rcs[i] != 0) {
898 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
899 i, remote_rcs[i], req);
904 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
905 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
906 req->rq_bulk->bd_nob_transferred, requested_nob);
913 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
915 if (p1->flag != p2->flag) {
916 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
917 OBD_BRW_SYNC | OBD_BRW_ASYNC |
918 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
920 /* warn if we try to combine flags that we don't know to be
922 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
923 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
924 "report this at https://jira.hpdd.intel.com/\n",
930 return (p1->off + p1->count == p2->off);
933 static u32 osc_checksum_bulk(int nob, size_t pg_count,
934 struct brw_page **pga, int opc,
935 cksum_type_t cksum_type)
939 struct cfs_crypto_hash_desc *hdesc;
940 unsigned int bufsize;
942 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
944 LASSERT(pg_count > 0);
946 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
948 CERROR("Unable to initialize checksum hash %s\n",
949 cfs_crypto_hash_name(cfs_alg));
950 return PTR_ERR(hdesc);
953 while (nob > 0 && pg_count > 0) {
954 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
956 /* corrupt the data before we compute the checksum, to
957 * simulate an OST->client data error */
958 if (i == 0 && opc == OST_READ &&
959 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
960 unsigned char *ptr = kmap(pga[i]->pg);
961 int off = pga[i]->off & ~PAGE_MASK;
963 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
966 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
967 pga[i]->off & ~PAGE_MASK,
969 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
970 (int)(pga[i]->off & ~PAGE_MASK));
972 nob -= pga[i]->count;
977 bufsize = sizeof(cksum);
978 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
980 /* For sending we only compute the wrong checksum instead
981 * of corrupting the data so it is still correct on a redo */
982 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
989 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
990 u32 page_count, struct brw_page **pga,
991 struct ptlrpc_request **reqp, int resend)
993 struct ptlrpc_request *req;
994 struct ptlrpc_bulk_desc *desc;
995 struct ost_body *body;
996 struct obd_ioobj *ioobj;
997 struct niobuf_remote *niobuf;
998 int niocount, i, requested_nob, opc, rc;
999 struct osc_brw_async_args *aa;
1000 struct req_capsule *pill;
1001 struct brw_page *pg_prev;
1004 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1005 RETURN(-ENOMEM); /* Recoverable */
1006 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1007 RETURN(-EINVAL); /* Fatal */
1009 if ((cmd & OBD_BRW_WRITE) != 0) {
1011 req = ptlrpc_request_alloc_pool(cli->cl_import,
1013 &RQF_OST_BRW_WRITE);
1016 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1021 for (niocount = i = 1; i < page_count; i++) {
1022 if (!can_merge_pages(pga[i - 1], pga[i]))
1026 pill = &req->rq_pill;
1027 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1029 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1030 niocount * sizeof(*niobuf));
1032 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1034 ptlrpc_request_free(req);
1037 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1038 ptlrpc_at_set_req_timeout(req);
1039 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1041 req->rq_no_retry_einprogress = 1;
1043 desc = ptlrpc_prep_bulk_imp(req, page_count,
1044 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1045 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1046 PTLRPC_BULK_PUT_SINK) |
1047 PTLRPC_BULK_BUF_KIOV,
1049 &ptlrpc_bulk_kiov_pin_ops);
1052 GOTO(out, rc = -ENOMEM);
1053 /* NB request now owns desc and will free it when it gets freed */
1055 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1056 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1057 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1058 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1060 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1062 obdo_to_ioobj(oa, ioobj);
1063 ioobj->ioo_bufcnt = niocount;
1064 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1065 * that might be send for this request. The actual number is decided
1066 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1067 * "max - 1" for old client compatibility sending "0", and also so the
1068 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1069 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1070 LASSERT(page_count > 0);
1072 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1073 struct brw_page *pg = pga[i];
1074 int poff = pg->off & ~PAGE_MASK;
1076 LASSERT(pg->count > 0);
1077 /* make sure there is no gap in the middle of page array */
1078 LASSERTF(page_count == 1 ||
1079 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1080 ergo(i > 0 && i < page_count - 1,
1081 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1082 ergo(i == page_count - 1, poff == 0)),
1083 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1084 i, page_count, pg, pg->off, pg->count);
1085 LASSERTF(i == 0 || pg->off > pg_prev->off,
1086 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1087 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1089 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1090 pg_prev->pg, page_private(pg_prev->pg),
1091 pg_prev->pg->index, pg_prev->off);
1092 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1093 (pg->flag & OBD_BRW_SRVLOCK));
1095 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1096 requested_nob += pg->count;
1098 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1100 niobuf->rnb_len += pg->count;
1102 niobuf->rnb_offset = pg->off;
1103 niobuf->rnb_len = pg->count;
1104 niobuf->rnb_flags = pg->flag;
1109 LASSERTF((void *)(niobuf - niocount) ==
1110 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1111 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1112 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1114 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1116 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1117 body->oa.o_valid |= OBD_MD_FLFLAGS;
1118 body->oa.o_flags = 0;
1120 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1123 if (osc_should_shrink_grant(cli))
1124 osc_shrink_grant_local(cli, &body->oa);
1126 /* size[REQ_REC_OFF] still sizeof (*body) */
1127 if (opc == OST_WRITE) {
1128 if (cli->cl_checksum &&
1129 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1130 /* store cl_cksum_type in a local variable since
1131 * it can be changed via lprocfs */
1132 cksum_type_t cksum_type = cli->cl_cksum_type;
1134 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1135 oa->o_flags &= OBD_FL_LOCAL_MASK;
1136 body->oa.o_flags = 0;
1138 body->oa.o_flags |= cksum_type_pack(cksum_type);
1139 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1140 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1144 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1146 /* save this in 'oa', too, for later checking */
1147 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1148 oa->o_flags |= cksum_type_pack(cksum_type);
1150 /* clear out the checksum flag, in case this is a
1151 * resend but cl_checksum is no longer set. b=11238 */
1152 oa->o_valid &= ~OBD_MD_FLCKSUM;
1154 oa->o_cksum = body->oa.o_cksum;
1155 /* 1 RC per niobuf */
1156 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1157 sizeof(__u32) * niocount);
1159 if (cli->cl_checksum &&
1160 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1161 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1162 body->oa.o_flags = 0;
1163 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1164 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1167 ptlrpc_request_set_replen(req);
1169 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1170 aa = ptlrpc_req_async_args(req);
1172 aa->aa_requested_nob = requested_nob;
1173 aa->aa_nio_count = niocount;
1174 aa->aa_page_count = page_count;
1178 INIT_LIST_HEAD(&aa->aa_oaps);
1181 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1182 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1183 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1184 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1188 ptlrpc_req_finished(req);
1192 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1193 __u32 client_cksum, __u32 server_cksum, int nob,
1194 size_t page_count, struct brw_page **pga,
1195 cksum_type_t client_cksum_type)
1199 cksum_type_t cksum_type;
1201 if (server_cksum == client_cksum) {
1202 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1206 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1208 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1211 if (cksum_type != client_cksum_type)
1212 msg = "the server did not use the checksum type specified in "
1213 "the original request - likely a protocol problem";
1214 else if (new_cksum == server_cksum)
1215 msg = "changed on the client after we checksummed it - "
1216 "likely false positive due to mmap IO (bug 11742)";
1217 else if (new_cksum == client_cksum)
1218 msg = "changed in transit before arrival at OST";
1220 msg = "changed in transit AND doesn't match the original - "
1221 "likely false positive due to mmap IO (bug 11742)";
1223 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1224 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1225 msg, libcfs_nid2str(peer->nid),
1226 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1227 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1228 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1229 POSTID(&oa->o_oi), pga[0]->off,
1230 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1231 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1232 "client csum now %x\n", client_cksum, client_cksum_type,
1233 server_cksum, cksum_type, new_cksum);
1237 /* Note rc enters this function as number of bytes transferred */
1238 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1240 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1241 const lnet_process_id_t *peer =
1242 &req->rq_import->imp_connection->c_peer;
1243 struct client_obd *cli = aa->aa_cli;
1244 struct ost_body *body;
1245 u32 client_cksum = 0;
1248 if (rc < 0 && rc != -EDQUOT) {
1249 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1253 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1254 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1256 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1260 /* set/clear over quota flag for a uid/gid */
1261 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1262 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1263 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1265 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1266 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1268 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1271 osc_update_grant(cli, body);
1276 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1277 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1279 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1281 CERROR("Unexpected +ve rc %d\n", rc);
1284 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1286 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1289 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1290 check_write_checksum(&body->oa, peer, client_cksum,
1291 body->oa.o_cksum, aa->aa_requested_nob,
1292 aa->aa_page_count, aa->aa_ppga,
1293 cksum_type_unpack(aa->aa_oa->o_flags)))
1296 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1297 aa->aa_page_count, aa->aa_ppga);
1301 /* The rest of this function executes only for OST_READs */
1303 /* if unwrap_bulk failed, return -EAGAIN to retry */
1304 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1306 GOTO(out, rc = -EAGAIN);
1308 if (rc > aa->aa_requested_nob) {
1309 CERROR("Unexpected rc %d (%d requested)\n", rc,
1310 aa->aa_requested_nob);
1314 if (rc != req->rq_bulk->bd_nob_transferred) {
1315 CERROR ("Unexpected rc %d (%d transferred)\n",
1316 rc, req->rq_bulk->bd_nob_transferred);
1320 if (rc < aa->aa_requested_nob)
1321 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1323 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1324 static int cksum_counter;
1325 u32 server_cksum = body->oa.o_cksum;
1328 cksum_type_t cksum_type;
1330 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1331 body->oa.o_flags : 0);
1332 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1333 aa->aa_ppga, OST_READ,
1336 if (peer->nid != req->rq_bulk->bd_sender) {
1338 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1341 if (server_cksum != client_cksum) {
1342 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1343 "%s%s%s inode "DFID" object "DOSTID
1344 " extent ["LPU64"-"LPU64"]\n",
1345 req->rq_import->imp_obd->obd_name,
1346 libcfs_nid2str(peer->nid),
1348 body->oa.o_valid & OBD_MD_FLFID ?
1349 body->oa.o_parent_seq : (__u64)0,
1350 body->oa.o_valid & OBD_MD_FLFID ?
1351 body->oa.o_parent_oid : 0,
1352 body->oa.o_valid & OBD_MD_FLFID ?
1353 body->oa.o_parent_ver : 0,
1354 POSTID(&body->oa.o_oi),
1355 aa->aa_ppga[0]->off,
1356 aa->aa_ppga[aa->aa_page_count-1]->off +
1357 aa->aa_ppga[aa->aa_page_count-1]->count -
1359 CERROR("client %x, server %x, cksum_type %x\n",
1360 client_cksum, server_cksum, cksum_type);
1362 aa->aa_oa->o_cksum = client_cksum;
1366 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1369 } else if (unlikely(client_cksum)) {
1370 static int cksum_missed;
1373 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1374 CERROR("Checksum %u requested from %s but not sent\n",
1375 cksum_missed, libcfs_nid2str(peer->nid));
1381 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1382 aa->aa_oa, &body->oa);
1387 static int osc_brw_redo_request(struct ptlrpc_request *request,
1388 struct osc_brw_async_args *aa, int rc)
1390 struct ptlrpc_request *new_req;
1391 struct osc_brw_async_args *new_aa;
1392 struct osc_async_page *oap;
1395 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1396 "redo for recoverable error %d", rc);
1398 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1399 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1400 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1401 aa->aa_ppga, &new_req, 1);
1405 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1406 if (oap->oap_request != NULL) {
1407 LASSERTF(request == oap->oap_request,
1408 "request %p != oap_request %p\n",
1409 request, oap->oap_request);
1410 if (oap->oap_interrupted) {
1411 ptlrpc_req_finished(new_req);
1416 /* New request takes over pga and oaps from old request.
1417 * Note that copying a list_head doesn't work, need to move it... */
1419 new_req->rq_interpret_reply = request->rq_interpret_reply;
1420 new_req->rq_async_args = request->rq_async_args;
1421 new_req->rq_commit_cb = request->rq_commit_cb;
1422 /* cap resend delay to the current request timeout, this is similar to
1423 * what ptlrpc does (see after_reply()) */
1424 if (aa->aa_resends > new_req->rq_timeout)
1425 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1427 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1428 new_req->rq_generation_set = 1;
1429 new_req->rq_import_generation = request->rq_import_generation;
1431 new_aa = ptlrpc_req_async_args(new_req);
1433 INIT_LIST_HEAD(&new_aa->aa_oaps);
1434 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1435 INIT_LIST_HEAD(&new_aa->aa_exts);
1436 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1437 new_aa->aa_resends = aa->aa_resends;
1439 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1440 if (oap->oap_request) {
1441 ptlrpc_req_finished(oap->oap_request);
1442 oap->oap_request = ptlrpc_request_addref(new_req);
1446 /* XXX: This code will run into problem if we're going to support
1447 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1448 * and wait for all of them to be finished. We should inherit request
1449 * set from old request. */
1450 ptlrpcd_add_req(new_req);
1452 DEBUG_REQ(D_INFO, new_req, "new request");
1457 * ugh, we want disk allocation on the target to happen in offset order. we'll
1458 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1459 * fine for our small page arrays and doesn't require allocation. its an
1460 * insertion sort that swaps elements that are strides apart, shrinking the
1461 * stride down until its '1' and the array is sorted.
1463 static void sort_brw_pages(struct brw_page **array, int num)
1466 struct brw_page *tmp;
1470 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1475 for (i = stride ; i < num ; i++) {
1478 while (j >= stride && array[j - stride]->off > tmp->off) {
1479 array[j] = array[j - stride];
1484 } while (stride > 1);
1487 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1489 LASSERT(ppga != NULL);
1490 OBD_FREE(ppga, sizeof(*ppga) * count);
1493 static int brw_interpret(const struct lu_env *env,
1494 struct ptlrpc_request *req, void *data, int rc)
1496 struct osc_brw_async_args *aa = data;
1497 struct osc_extent *ext;
1498 struct osc_extent *tmp;
1499 struct client_obd *cli = aa->aa_cli;
1502 rc = osc_brw_fini_request(req, rc);
1503 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1504 /* When server return -EINPROGRESS, client should always retry
1505 * regardless of the number of times the bulk was resent already. */
1506 if (osc_recoverable_error(rc)) {
1507 if (req->rq_import_generation !=
1508 req->rq_import->imp_generation) {
1509 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1510 ""DOSTID", rc = %d.\n",
1511 req->rq_import->imp_obd->obd_name,
1512 POSTID(&aa->aa_oa->o_oi), rc);
1513 } else if (rc == -EINPROGRESS ||
1514 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1515 rc = osc_brw_redo_request(req, aa, rc);
1517 CERROR("%s: too many resent retries for object: "
1518 ""LPU64":"LPU64", rc = %d.\n",
1519 req->rq_import->imp_obd->obd_name,
1520 POSTID(&aa->aa_oa->o_oi), rc);
1525 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1530 struct obdo *oa = aa->aa_oa;
1531 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1532 unsigned long valid = 0;
1533 struct cl_object *obj;
1534 struct osc_async_page *last;
1536 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1537 obj = osc2cl(last->oap_obj);
1539 cl_object_attr_lock(obj);
1540 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1541 attr->cat_blocks = oa->o_blocks;
1542 valid |= CAT_BLOCKS;
1544 if (oa->o_valid & OBD_MD_FLMTIME) {
1545 attr->cat_mtime = oa->o_mtime;
1548 if (oa->o_valid & OBD_MD_FLATIME) {
1549 attr->cat_atime = oa->o_atime;
1552 if (oa->o_valid & OBD_MD_FLCTIME) {
1553 attr->cat_ctime = oa->o_ctime;
1557 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1558 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1559 loff_t last_off = last->oap_count + last->oap_obj_off +
1562 /* Change file size if this is an out of quota or
1563 * direct IO write and it extends the file size */
1564 if (loi->loi_lvb.lvb_size < last_off) {
1565 attr->cat_size = last_off;
1568 /* Extend KMS if it's not a lockless write */
1569 if (loi->loi_kms < last_off &&
1570 oap2osc_page(last)->ops_srvlock == 0) {
1571 attr->cat_kms = last_off;
1577 cl_object_attr_update(env, obj, attr, valid);
1578 cl_object_attr_unlock(obj);
1580 OBDO_FREE(aa->aa_oa);
1582 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1583 osc_inc_unstable_pages(req);
1585 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1586 list_del_init(&ext->oe_link);
1587 osc_extent_finish(env, ext, 1, rc);
1589 LASSERT(list_empty(&aa->aa_exts));
1590 LASSERT(list_empty(&aa->aa_oaps));
1592 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1593 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1595 spin_lock(&cli->cl_loi_list_lock);
1596 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1597 * is called so we know whether to go to sync BRWs or wait for more
1598 * RPCs to complete */
1599 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1600 cli->cl_w_in_flight--;
1602 cli->cl_r_in_flight--;
1603 osc_wake_cache_waiters(cli);
1604 spin_unlock(&cli->cl_loi_list_lock);
1606 osc_io_unplug(env, cli, NULL);
1610 static void brw_commit(struct ptlrpc_request *req)
1612 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1613 * this called via the rq_commit_cb, I need to ensure
1614 * osc_dec_unstable_pages is still called. Otherwise unstable
1615 * pages may be leaked. */
1616 spin_lock(&req->rq_lock);
1617 if (likely(req->rq_unstable)) {
1618 req->rq_unstable = 0;
1619 spin_unlock(&req->rq_lock);
1621 osc_dec_unstable_pages(req);
1623 req->rq_committed = 1;
1624 spin_unlock(&req->rq_lock);
1629 * Build an RPC by the list of extent @ext_list. The caller must ensure
1630 * that the total pages in this list are NOT over max pages per RPC.
1631 * Extents in the list must be in OES_RPC state.
1633 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1634 struct list_head *ext_list, int cmd)
1636 struct ptlrpc_request *req = NULL;
1637 struct osc_extent *ext;
1638 struct brw_page **pga = NULL;
1639 struct osc_brw_async_args *aa = NULL;
1640 struct obdo *oa = NULL;
1641 struct osc_async_page *oap;
1642 struct osc_object *obj = NULL;
1643 struct cl_req_attr *crattr = NULL;
1644 loff_t starting_offset = OBD_OBJECT_EOF;
1645 loff_t ending_offset = 0;
1649 bool soft_sync = false;
1650 bool interrupted = false;
1653 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1654 struct ost_body *body;
1656 LASSERT(!list_empty(ext_list));
1658 /* add pages into rpc_list to build BRW rpc */
1659 list_for_each_entry(ext, ext_list, oe_link) {
1660 LASSERT(ext->oe_state == OES_RPC);
1661 mem_tight |= ext->oe_memalloc;
1662 page_count += ext->oe_nr_pages;
1667 soft_sync = osc_over_unstable_soft_limit(cli);
1669 mpflag = cfs_memory_pressure_get_and_set();
1671 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1673 GOTO(out, rc = -ENOMEM);
1677 GOTO(out, rc = -ENOMEM);
1680 list_for_each_entry(ext, ext_list, oe_link) {
1681 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1683 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1685 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1686 pga[i] = &oap->oap_brw_page;
1687 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1690 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1691 if (starting_offset == OBD_OBJECT_EOF ||
1692 starting_offset > oap->oap_obj_off)
1693 starting_offset = oap->oap_obj_off;
1695 LASSERT(oap->oap_page_off == 0);
1696 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1697 ending_offset = oap->oap_obj_off +
1700 LASSERT(oap->oap_page_off + oap->oap_count ==
1702 if (oap->oap_interrupted)
1707 /* first page in the list */
1708 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1710 crattr = &osc_env_info(env)->oti_req_attr;
1711 memset(crattr, 0, sizeof(*crattr));
1712 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1713 crattr->cra_flags = ~0ULL;
1714 crattr->cra_page = oap2cl_page(oap);
1715 crattr->cra_oa = oa;
1716 cl_req_attr_set(env, osc2cl(obj), crattr);
1718 sort_brw_pages(pga, page_count);
1719 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1721 CERROR("prep_req failed: %d\n", rc);
1725 req->rq_commit_cb = brw_commit;
1726 req->rq_interpret_reply = brw_interpret;
1727 req->rq_memalloc = mem_tight != 0;
1728 oap->oap_request = ptlrpc_request_addref(req);
1729 if (interrupted && !req->rq_intr)
1730 ptlrpc_mark_interrupted(req);
1732 /* Need to update the timestamps after the request is built in case
1733 * we race with setattr (locally or in queue at OST). If OST gets
1734 * later setattr before earlier BRW (as determined by the request xid),
1735 * the OST will not use BRW timestamps. Sadly, there is no obvious
1736 * way to do this in a single call. bug 10150 */
1737 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1738 crattr->cra_oa = &body->oa;
1739 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1740 cl_req_attr_set(env, osc2cl(obj), crattr);
1741 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1743 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1744 aa = ptlrpc_req_async_args(req);
1745 INIT_LIST_HEAD(&aa->aa_oaps);
1746 list_splice_init(&rpc_list, &aa->aa_oaps);
1747 INIT_LIST_HEAD(&aa->aa_exts);
1748 list_splice_init(ext_list, &aa->aa_exts);
1750 spin_lock(&cli->cl_loi_list_lock);
1751 starting_offset >>= PAGE_CACHE_SHIFT;
1752 if (cmd == OBD_BRW_READ) {
1753 cli->cl_r_in_flight++;
1754 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1755 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1756 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1757 starting_offset + 1);
1759 cli->cl_w_in_flight++;
1760 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1761 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1762 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1763 starting_offset + 1);
1765 spin_unlock(&cli->cl_loi_list_lock);
1767 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1768 page_count, aa, cli->cl_r_in_flight,
1769 cli->cl_w_in_flight);
1771 ptlrpcd_add_req(req);
1777 cfs_memory_pressure_restore(mpflag);
1780 LASSERT(req == NULL);
1785 OBD_FREE(pga, sizeof(*pga) * page_count);
1786 /* this should happen rarely and is pretty bad, it makes the
1787 * pending list not follow the dirty order */
1788 while (!list_empty(ext_list)) {
1789 ext = list_entry(ext_list->next, struct osc_extent,
1791 list_del_init(&ext->oe_link);
1792 osc_extent_finish(env, ext, 0, rc);
1798 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1799 struct ldlm_enqueue_info *einfo)
1801 void *data = einfo->ei_cbdata;
1804 LASSERT(lock != NULL);
1805 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1806 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1807 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1808 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1810 lock_res_and_lock(lock);
1812 if (lock->l_ast_data == NULL)
1813 lock->l_ast_data = data;
1814 if (lock->l_ast_data == data)
1817 unlock_res_and_lock(lock);
1822 static int osc_set_data_with_check(struct lustre_handle *lockh,
1823 struct ldlm_enqueue_info *einfo)
1825 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1829 set = osc_set_lock_data_with_check(lock, einfo);
1830 LDLM_LOCK_PUT(lock);
1832 CERROR("lockh %p, data %p - client evicted?\n",
1833 lockh, einfo->ei_cbdata);
1837 static int osc_enqueue_fini(struct ptlrpc_request *req,
1838 osc_enqueue_upcall_f upcall, void *cookie,
1839 struct lustre_handle *lockh, enum ldlm_mode mode,
1840 __u64 *flags, int agl, int errcode)
1842 bool intent = *flags & LDLM_FL_HAS_INTENT;
1846 /* The request was created before ldlm_cli_enqueue call. */
1847 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1848 struct ldlm_reply *rep;
1850 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1851 LASSERT(rep != NULL);
1853 rep->lock_policy_res1 =
1854 ptlrpc_status_ntoh(rep->lock_policy_res1);
1855 if (rep->lock_policy_res1)
1856 errcode = rep->lock_policy_res1;
1858 *flags |= LDLM_FL_LVB_READY;
1859 } else if (errcode == ELDLM_OK) {
1860 *flags |= LDLM_FL_LVB_READY;
1863 /* Call the update callback. */
1864 rc = (*upcall)(cookie, lockh, errcode);
1866 /* release the reference taken in ldlm_cli_enqueue() */
1867 if (errcode == ELDLM_LOCK_MATCHED)
1869 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1870 ldlm_lock_decref(lockh, mode);
1875 static int osc_enqueue_interpret(const struct lu_env *env,
1876 struct ptlrpc_request *req,
1877 struct osc_enqueue_args *aa, int rc)
1879 struct ldlm_lock *lock;
1880 struct lustre_handle *lockh = &aa->oa_lockh;
1881 enum ldlm_mode mode = aa->oa_mode;
1882 struct ost_lvb *lvb = aa->oa_lvb;
1883 __u32 lvb_len = sizeof(*lvb);
1888 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1890 lock = ldlm_handle2lock(lockh);
1891 LASSERTF(lock != NULL,
1892 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1893 lockh->cookie, req, aa);
1895 /* Take an additional reference so that a blocking AST that
1896 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1897 * to arrive after an upcall has been executed by
1898 * osc_enqueue_fini(). */
1899 ldlm_lock_addref(lockh, mode);
1901 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1902 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1904 /* Let CP AST to grant the lock first. */
1905 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1908 LASSERT(aa->oa_lvb == NULL);
1909 LASSERT(aa->oa_flags == NULL);
1910 aa->oa_flags = &flags;
1913 /* Complete obtaining the lock procedure. */
1914 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1915 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1917 /* Complete osc stuff. */
1918 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1919 aa->oa_flags, aa->oa_agl, rc);
1921 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1923 ldlm_lock_decref(lockh, mode);
1924 LDLM_LOCK_PUT(lock);
1928 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1930 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1931 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1932 * other synchronous requests, however keeping some locks and trying to obtain
1933 * others may take a considerable amount of time in a case of ost failure; and
1934 * when other sync requests do not get released lock from a client, the client
1935 * is evicted from the cluster -- such scenarious make the life difficult, so
1936 * release locks just after they are obtained. */
1937 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1938 __u64 *flags, union ldlm_policy_data *policy,
1939 struct ost_lvb *lvb, int kms_valid,
1940 osc_enqueue_upcall_f upcall, void *cookie,
1941 struct ldlm_enqueue_info *einfo,
1942 struct ptlrpc_request_set *rqset, int async, int agl)
1944 struct obd_device *obd = exp->exp_obd;
1945 struct lustre_handle lockh = { 0 };
1946 struct ptlrpc_request *req = NULL;
1947 int intent = *flags & LDLM_FL_HAS_INTENT;
1948 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1949 enum ldlm_mode mode;
1953 /* Filesystem lock extents are extended to page boundaries so that
1954 * dealing with the page cache is a little smoother. */
1955 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1956 policy->l_extent.end |= ~PAGE_MASK;
1959 * kms is not valid when either object is completely fresh (so that no
1960 * locks are cached), or object was evicted. In the latter case cached
1961 * lock cannot be used, because it would prime inode state with
1962 * potentially stale LVB.
1967 /* Next, search for already existing extent locks that will cover us */
1968 /* If we're trying to read, we also search for an existing PW lock. The
1969 * VFS and page cache already protect us locally, so lots of readers/
1970 * writers can share a single PW lock.
1972 * There are problems with conversion deadlocks, so instead of
1973 * converting a read lock to a write lock, we'll just enqueue a new
1976 * At some point we should cancel the read lock instead of making them
1977 * send us a blocking callback, but there are problems with canceling
1978 * locks out from other users right now, too. */
1979 mode = einfo->ei_mode;
1980 if (einfo->ei_mode == LCK_PR)
1982 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
1983 einfo->ei_type, policy, mode, &lockh, 0);
1985 struct ldlm_lock *matched;
1987 if (*flags & LDLM_FL_TEST_LOCK)
1990 matched = ldlm_handle2lock(&lockh);
1992 /* AGL enqueues DLM locks speculatively. Therefore if
1993 * it already exists a DLM lock, it wll just inform the
1994 * caller to cancel the AGL process for this stripe. */
1995 ldlm_lock_decref(&lockh, mode);
1996 LDLM_LOCK_PUT(matched);
1998 } else if (osc_set_lock_data_with_check(matched, einfo)) {
1999 *flags |= LDLM_FL_LVB_READY;
2001 /* We already have a lock, and it's referenced. */
2002 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2004 ldlm_lock_decref(&lockh, mode);
2005 LDLM_LOCK_PUT(matched);
2008 ldlm_lock_decref(&lockh, mode);
2009 LDLM_LOCK_PUT(matched);
2014 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2018 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2019 &RQF_LDLM_ENQUEUE_LVB);
2023 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2025 ptlrpc_request_free(req);
2029 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2031 ptlrpc_request_set_replen(req);
2034 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2035 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2037 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2038 sizeof(*lvb), LVB_T_OST, &lockh, async);
2041 struct osc_enqueue_args *aa;
2042 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2043 aa = ptlrpc_req_async_args(req);
2045 aa->oa_mode = einfo->ei_mode;
2046 aa->oa_type = einfo->ei_type;
2047 lustre_handle_copy(&aa->oa_lockh, &lockh);
2048 aa->oa_upcall = upcall;
2049 aa->oa_cookie = cookie;
2052 aa->oa_flags = flags;
2055 /* AGL is essentially to enqueue an DLM lock
2056 * in advance, so we don't care about the
2057 * result of AGL enqueue. */
2059 aa->oa_flags = NULL;
2062 req->rq_interpret_reply =
2063 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2064 if (rqset == PTLRPCD_SET)
2065 ptlrpcd_add_req(req);
2067 ptlrpc_set_add_req(rqset, req);
2068 } else if (intent) {
2069 ptlrpc_req_finished(req);
2074 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2077 ptlrpc_req_finished(req);
2082 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2083 enum ldlm_type type, union ldlm_policy_data *policy,
2084 enum ldlm_mode mode, __u64 *flags, void *data,
2085 struct lustre_handle *lockh, int unref)
2087 struct obd_device *obd = exp->exp_obd;
2088 __u64 lflags = *flags;
2092 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2095 /* Filesystem lock extents are extended to page boundaries so that
2096 * dealing with the page cache is a little smoother */
2097 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2098 policy->l_extent.end |= ~PAGE_MASK;
2100 /* Next, search for already existing extent locks that will cover us */
2101 /* If we're trying to read, we also search for an existing PW lock. The
2102 * VFS and page cache already protect us locally, so lots of readers/
2103 * writers can share a single PW lock. */
2107 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2108 res_id, type, policy, rc, lockh, unref);
2111 if (!osc_set_data_with_check(lockh, data)) {
2112 if (!(lflags & LDLM_FL_TEST_LOCK))
2113 ldlm_lock_decref(lockh, rc);
2117 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2118 ldlm_lock_addref(lockh, LCK_PR);
2119 ldlm_lock_decref(lockh, LCK_PW);
2126 static int osc_statfs_interpret(const struct lu_env *env,
2127 struct ptlrpc_request *req,
2128 struct osc_async_args *aa, int rc)
2130 struct obd_statfs *msfs;
2134 /* The request has in fact never been sent
2135 * due to issues at a higher level (LOV).
2136 * Exit immediately since the caller is
2137 * aware of the problem and takes care
2138 * of the clean up */
2141 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2142 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2148 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2150 GOTO(out, rc = -EPROTO);
2153 *aa->aa_oi->oi_osfs = *msfs;
2155 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2159 static int osc_statfs_async(struct obd_export *exp,
2160 struct obd_info *oinfo, __u64 max_age,
2161 struct ptlrpc_request_set *rqset)
2163 struct obd_device *obd = class_exp2obd(exp);
2164 struct ptlrpc_request *req;
2165 struct osc_async_args *aa;
2169 /* We could possibly pass max_age in the request (as an absolute
2170 * timestamp or a "seconds.usec ago") so the target can avoid doing
2171 * extra calls into the filesystem if that isn't necessary (e.g.
2172 * during mount that would help a bit). Having relative timestamps
2173 * is not so great if request processing is slow, while absolute
2174 * timestamps are not ideal because they need time synchronization. */
2175 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2179 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2181 ptlrpc_request_free(req);
2184 ptlrpc_request_set_replen(req);
2185 req->rq_request_portal = OST_CREATE_PORTAL;
2186 ptlrpc_at_set_req_timeout(req);
2188 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2189 /* procfs requests not want stat in wait for avoid deadlock */
2190 req->rq_no_resend = 1;
2191 req->rq_no_delay = 1;
2194 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2195 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2196 aa = ptlrpc_req_async_args(req);
2199 ptlrpc_set_add_req(rqset, req);
2203 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2204 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2206 struct obd_device *obd = class_exp2obd(exp);
2207 struct obd_statfs *msfs;
2208 struct ptlrpc_request *req;
2209 struct obd_import *imp = NULL;
2213 /*Since the request might also come from lprocfs, so we need
2214 *sync this with client_disconnect_export Bug15684*/
2215 down_read(&obd->u.cli.cl_sem);
2216 if (obd->u.cli.cl_import)
2217 imp = class_import_get(obd->u.cli.cl_import);
2218 up_read(&obd->u.cli.cl_sem);
2222 /* We could possibly pass max_age in the request (as an absolute
2223 * timestamp or a "seconds.usec ago") so the target can avoid doing
2224 * extra calls into the filesystem if that isn't necessary (e.g.
2225 * during mount that would help a bit). Having relative timestamps
2226 * is not so great if request processing is slow, while absolute
2227 * timestamps are not ideal because they need time synchronization. */
2228 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2230 class_import_put(imp);
2235 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2237 ptlrpc_request_free(req);
2240 ptlrpc_request_set_replen(req);
2241 req->rq_request_portal = OST_CREATE_PORTAL;
2242 ptlrpc_at_set_req_timeout(req);
2244 if (flags & OBD_STATFS_NODELAY) {
2245 /* procfs requests not want stat in wait for avoid deadlock */
2246 req->rq_no_resend = 1;
2247 req->rq_no_delay = 1;
2250 rc = ptlrpc_queue_wait(req);
2254 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2256 GOTO(out, rc = -EPROTO);
2263 ptlrpc_req_finished(req);
2267 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2268 void *karg, void *uarg)
2270 struct obd_device *obd = exp->exp_obd;
2271 struct obd_ioctl_data *data = karg;
2275 if (!try_module_get(THIS_MODULE)) {
2276 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2277 module_name(THIS_MODULE));
2281 case OBD_IOC_CLIENT_RECOVER:
2282 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2283 data->ioc_inlbuf1, 0);
2287 case IOC_OSC_SET_ACTIVE:
2288 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2291 case OBD_IOC_PING_TARGET:
2292 err = ptlrpc_obd_ping(obd);
2295 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2296 cmd, current_comm());
2297 GOTO(out, err = -ENOTTY);
2300 module_put(THIS_MODULE);
2304 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2305 u32 keylen, void *key,
2306 u32 vallen, void *val,
2307 struct ptlrpc_request_set *set)
2309 struct ptlrpc_request *req;
2310 struct obd_device *obd = exp->exp_obd;
2311 struct obd_import *imp = class_exp2cliimp(exp);
2316 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2318 if (KEY_IS(KEY_CHECKSUM)) {
2319 if (vallen != sizeof(int))
2321 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2325 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2326 sptlrpc_conf_client_adapt(obd);
2330 if (KEY_IS(KEY_FLUSH_CTX)) {
2331 sptlrpc_import_flush_my_ctx(imp);
2335 if (KEY_IS(KEY_CACHE_SET)) {
2336 struct client_obd *cli = &obd->u.cli;
2338 LASSERT(cli->cl_cache == NULL); /* only once */
2339 cli->cl_cache = (struct cl_client_cache *)val;
2340 cl_cache_incref(cli->cl_cache);
2341 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2343 /* add this osc into entity list */
2344 LASSERT(list_empty(&cli->cl_lru_osc));
2345 spin_lock(&cli->cl_cache->ccc_lru_lock);
2346 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2347 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2352 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2353 struct client_obd *cli = &obd->u.cli;
2354 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2355 long target = *(long *)val;
2357 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2362 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2365 /* We pass all other commands directly to OST. Since nobody calls osc
2366 methods directly and everybody is supposed to go through LOV, we
2367 assume lov checked invalid values for us.
2368 The only recognised values so far are evict_by_nid and mds_conn.
2369 Even if something bad goes through, we'd get a -EINVAL from OST
2372 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2373 &RQF_OST_SET_GRANT_INFO :
2378 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2379 RCL_CLIENT, keylen);
2380 if (!KEY_IS(KEY_GRANT_SHRINK))
2381 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2382 RCL_CLIENT, vallen);
2383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2385 ptlrpc_request_free(req);
2389 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2390 memcpy(tmp, key, keylen);
2391 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2394 memcpy(tmp, val, vallen);
2396 if (KEY_IS(KEY_GRANT_SHRINK)) {
2397 struct osc_grant_args *aa;
2400 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2401 aa = ptlrpc_req_async_args(req);
2404 ptlrpc_req_finished(req);
2407 *oa = ((struct ost_body *)val)->oa;
2409 req->rq_interpret_reply = osc_shrink_grant_interpret;
2412 ptlrpc_request_set_replen(req);
2413 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2414 LASSERT(set != NULL);
2415 ptlrpc_set_add_req(set, req);
2416 ptlrpc_check_set(NULL, set);
2418 ptlrpcd_add_req(req);
2424 static int osc_reconnect(const struct lu_env *env,
2425 struct obd_export *exp, struct obd_device *obd,
2426 struct obd_uuid *cluuid,
2427 struct obd_connect_data *data,
2430 struct client_obd *cli = &obd->u.cli;
2432 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2435 spin_lock(&cli->cl_loi_list_lock);
2436 data->ocd_grant = (cli->cl_avail_grant +
2437 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2438 2 * cli_brw_size(obd);
2439 lost_grant = cli->cl_lost_grant;
2440 cli->cl_lost_grant = 0;
2441 spin_unlock(&cli->cl_loi_list_lock);
2443 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2444 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2445 data->ocd_version, data->ocd_grant, lost_grant);
2451 static int osc_disconnect(struct obd_export *exp)
2453 struct obd_device *obd = class_exp2obd(exp);
2456 rc = client_disconnect_export(exp);
2458 * Initially we put del_shrink_grant before disconnect_export, but it
2459 * causes the following problem if setup (connect) and cleanup
2460 * (disconnect) are tangled together.
2461 * connect p1 disconnect p2
2462 * ptlrpc_connect_import
2463 * ............... class_manual_cleanup
2466 * ptlrpc_connect_interrupt
2468 * add this client to shrink list
2470 * Bang! pinger trigger the shrink.
2471 * So the osc should be disconnected from the shrink list, after we
2472 * are sure the import has been destroyed. BUG18662
2474 if (obd->u.cli.cl_import == NULL)
2475 osc_del_shrink_grant(&obd->u.cli);
2479 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2480 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2482 struct lu_env *env = arg;
2483 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2484 struct ldlm_lock *lock;
2485 struct osc_object *osc = NULL;
2489 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2490 if (lock->l_ast_data != NULL && osc == NULL) {
2491 osc = lock->l_ast_data;
2492 cl_object_get(osc2cl(osc));
2495 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2496 * by the 2nd round of ldlm_namespace_clean() call in
2497 * osc_import_event(). */
2498 ldlm_clear_cleaned(lock);
2503 osc_object_invalidate(env, osc);
2504 cl_object_put(env, osc2cl(osc));
2510 static int osc_import_event(struct obd_device *obd,
2511 struct obd_import *imp,
2512 enum obd_import_event event)
2514 struct client_obd *cli;
2518 LASSERT(imp->imp_obd == obd);
2521 case IMP_EVENT_DISCON: {
2523 spin_lock(&cli->cl_loi_list_lock);
2524 cli->cl_avail_grant = 0;
2525 cli->cl_lost_grant = 0;
2526 spin_unlock(&cli->cl_loi_list_lock);
2529 case IMP_EVENT_INACTIVE: {
2530 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2533 case IMP_EVENT_INVALIDATE: {
2534 struct ldlm_namespace *ns = obd->obd_namespace;
2538 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2540 env = cl_env_get(&refcheck);
2542 osc_io_unplug(env, &obd->u.cli, NULL);
2544 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2545 osc_ldlm_resource_invalidate,
2547 cl_env_put(env, &refcheck);
2549 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2554 case IMP_EVENT_ACTIVE: {
2555 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2558 case IMP_EVENT_OCD: {
2559 struct obd_connect_data *ocd = &imp->imp_connect_data;
2561 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2562 osc_init_grant(&obd->u.cli, ocd);
2565 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2566 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2568 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2571 case IMP_EVENT_DEACTIVATE: {
2572 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2575 case IMP_EVENT_ACTIVATE: {
2576 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2580 CERROR("Unknown import event %d\n", event);
2587 * Determine whether the lock can be canceled before replaying the lock
2588 * during recovery, see bug16774 for detailed information.
2590 * \retval zero the lock can't be canceled
2591 * \retval other ok to cancel
2593 static int osc_cancel_weight(struct ldlm_lock *lock)
2596 * Cancel all unused and granted extent lock.
2598 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2599 lock->l_granted_mode == lock->l_req_mode &&
2600 osc_ldlm_weigh_ast(lock) == 0)
2606 static int brw_queue_work(const struct lu_env *env, void *data)
2608 struct client_obd *cli = data;
2610 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2612 osc_io_unplug(env, cli, NULL);
2616 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2618 struct client_obd *cli = &obd->u.cli;
2619 struct obd_type *type;
2627 rc = ptlrpcd_addref();
2631 rc = client_obd_setup(obd, lcfg);
2633 GOTO(out_ptlrpcd, rc);
2635 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2636 if (IS_ERR(handler))
2637 GOTO(out_client_setup, rc = PTR_ERR(handler));
2638 cli->cl_writeback_work = handler;
2640 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2641 if (IS_ERR(handler))
2642 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2643 cli->cl_lru_work = handler;
2645 rc = osc_quota_setup(obd);
2647 GOTO(out_ptlrpcd_work, rc);
2649 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2651 #ifdef CONFIG_PROC_FS
2652 obd->obd_vars = lprocfs_osc_obd_vars;
2654 /* If this is true then both client (osc) and server (osp) are on the
2655 * same node. The osp layer if loaded first will register the osc proc
2656 * directory. In that case this obd_device will be attached its proc
2657 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2658 type = class_search_type(LUSTRE_OSP_NAME);
2659 if (type && type->typ_procsym) {
2660 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2662 obd->obd_vars, obd);
2663 if (IS_ERR(obd->obd_proc_entry)) {
2664 rc = PTR_ERR(obd->obd_proc_entry);
2665 CERROR("error %d setting up lprocfs for %s\n", rc,
2667 obd->obd_proc_entry = NULL;
2670 rc = lprocfs_obd_setup(obd);
2673 /* If the basic OSC proc tree construction succeeded then
2674 * lets do the rest. */
2676 lproc_osc_attach_seqstat(obd);
2677 sptlrpc_lprocfs_cliobd_attach(obd);
2678 ptlrpc_lprocfs_register_obd(obd);
2682 * We try to control the total number of requests with a upper limit
2683 * osc_reqpool_maxreqcount. There might be some race which will cause
2684 * over-limit allocation, but it is fine.
2686 req_count = atomic_read(&osc_pool_req_count);
2687 if (req_count < osc_reqpool_maxreqcount) {
2688 adding = cli->cl_max_rpcs_in_flight + 2;
2689 if (req_count + adding > osc_reqpool_maxreqcount)
2690 adding = osc_reqpool_maxreqcount - req_count;
2692 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2693 atomic_add(added, &osc_pool_req_count);
2696 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2697 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2699 spin_lock(&osc_shrink_lock);
2700 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2701 spin_unlock(&osc_shrink_lock);
2706 if (cli->cl_writeback_work != NULL) {
2707 ptlrpcd_destroy_work(cli->cl_writeback_work);
2708 cli->cl_writeback_work = NULL;
2710 if (cli->cl_lru_work != NULL) {
2711 ptlrpcd_destroy_work(cli->cl_lru_work);
2712 cli->cl_lru_work = NULL;
2715 client_obd_cleanup(obd);
2721 static int osc_precleanup(struct obd_device *obd)
2723 struct client_obd *cli = &obd->u.cli;
2727 * for echo client, export may be on zombie list, wait for
2728 * zombie thread to cull it, because cli.cl_import will be
2729 * cleared in client_disconnect_export():
2730 * class_export_destroy() -> obd_cleanup() ->
2731 * echo_device_free() -> echo_client_cleanup() ->
2732 * obd_disconnect() -> osc_disconnect() ->
2733 * client_disconnect_export()
2735 obd_zombie_barrier();
2736 if (cli->cl_writeback_work) {
2737 ptlrpcd_destroy_work(cli->cl_writeback_work);
2738 cli->cl_writeback_work = NULL;
2741 if (cli->cl_lru_work) {
2742 ptlrpcd_destroy_work(cli->cl_lru_work);
2743 cli->cl_lru_work = NULL;
2746 obd_cleanup_client_import(obd);
2747 ptlrpc_lprocfs_unregister_obd(obd);
2748 lprocfs_obd_cleanup(obd);
2752 int osc_cleanup(struct obd_device *obd)
2754 struct client_obd *cli = &obd->u.cli;
2759 spin_lock(&osc_shrink_lock);
2760 list_del(&cli->cl_shrink_list);
2761 spin_unlock(&osc_shrink_lock);
2764 if (cli->cl_cache != NULL) {
2765 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2766 spin_lock(&cli->cl_cache->ccc_lru_lock);
2767 list_del_init(&cli->cl_lru_osc);
2768 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2769 cli->cl_lru_left = NULL;
2770 cl_cache_decref(cli->cl_cache);
2771 cli->cl_cache = NULL;
2774 /* free memory of osc quota cache */
2775 osc_quota_cleanup(obd);
2777 rc = client_obd_cleanup(obd);
2783 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2785 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2786 return rc > 0 ? 0: rc;
2789 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2791 return osc_process_config_base(obd, buf);
2794 static struct obd_ops osc_obd_ops = {
2795 .o_owner = THIS_MODULE,
2796 .o_setup = osc_setup,
2797 .o_precleanup = osc_precleanup,
2798 .o_cleanup = osc_cleanup,
2799 .o_add_conn = client_import_add_conn,
2800 .o_del_conn = client_import_del_conn,
2801 .o_connect = client_connect_import,
2802 .o_reconnect = osc_reconnect,
2803 .o_disconnect = osc_disconnect,
2804 .o_statfs = osc_statfs,
2805 .o_statfs_async = osc_statfs_async,
2806 .o_create = osc_create,
2807 .o_destroy = osc_destroy,
2808 .o_getattr = osc_getattr,
2809 .o_setattr = osc_setattr,
2810 .o_iocontrol = osc_iocontrol,
2811 .o_set_info_async = osc_set_info_async,
2812 .o_import_event = osc_import_event,
2813 .o_process_config = osc_process_config,
2814 .o_quotactl = osc_quotactl,
2817 static struct shrinker *osc_cache_shrinker;
2818 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2819 DEFINE_SPINLOCK(osc_shrink_lock);
2821 #ifndef HAVE_SHRINKER_COUNT
2822 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2824 struct shrink_control scv = {
2825 .nr_to_scan = shrink_param(sc, nr_to_scan),
2826 .gfp_mask = shrink_param(sc, gfp_mask)
2828 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2829 struct shrinker *shrinker = NULL;
2832 (void)osc_cache_shrink_scan(shrinker, &scv);
2834 return osc_cache_shrink_count(shrinker, &scv);
2838 static int __init osc_init(void)
2840 bool enable_proc = true;
2841 struct obd_type *type;
2842 unsigned int reqpool_size;
2843 unsigned int reqsize;
2845 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2846 osc_cache_shrink_count, osc_cache_shrink_scan);
2849 /* print an address of _any_ initialized kernel symbol from this
2850 * module, to allow debugging with gdb that doesn't support data
2851 * symbols from modules.*/
2852 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2854 rc = lu_kmem_init(osc_caches);
2858 type = class_search_type(LUSTRE_OSP_NAME);
2859 if (type != NULL && type->typ_procsym != NULL)
2860 enable_proc = false;
2862 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2863 LUSTRE_OSC_NAME, &osc_device_type);
2867 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2869 /* This is obviously too much memory, only prevent overflow here */
2870 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2871 GOTO(out_type, rc = -EINVAL);
2873 reqpool_size = osc_reqpool_mem_max << 20;
2876 while (reqsize < OST_IO_MAXREQSIZE)
2877 reqsize = reqsize << 1;
2880 * We don't enlarge the request count in OSC pool according to
2881 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2882 * tried after normal allocation failed. So a small OSC pool won't
2883 * cause much performance degression in most of cases.
2885 osc_reqpool_maxreqcount = reqpool_size / reqsize;
2887 atomic_set(&osc_pool_req_count, 0);
2888 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2889 ptlrpc_add_rqs_to_pool);
2891 if (osc_rq_pool != NULL)
2895 class_unregister_type(LUSTRE_OSC_NAME);
2897 lu_kmem_fini(osc_caches);
2902 static void /*__exit*/ osc_exit(void)
2904 remove_shrinker(osc_cache_shrinker);
2905 class_unregister_type(LUSTRE_OSC_NAME);
2906 lu_kmem_fini(osc_caches);
2907 ptlrpc_free_rq_pool(osc_rq_pool);
2910 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2911 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2912 MODULE_VERSION(LUSTRE_VERSION_STRING);
2913 MODULE_LICENSE("GPL");
2915 module_init(osc_init);
2916 module_exit(osc_exit);