4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_enqueue_args {
95 struct obd_export *oa_exp;
96 enum ldlm_type oa_type;
97 enum ldlm_mode oa_mode;
99 osc_enqueue_upcall_f oa_upcall;
101 struct ost_lvb *oa_lvb;
102 struct lustre_handle oa_lockh;
103 unsigned int oa_agl:1;
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
112 struct ost_body *body;
114 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123 struct ptlrpc_request *req;
124 struct ost_body *body;
128 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
132 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
134 ptlrpc_request_free(req);
138 osc_pack_req_body(req, oa);
140 ptlrpc_request_set_replen(req);
142 rc = ptlrpc_queue_wait(req);
146 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
148 GOTO(out, rc = -EPROTO);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
153 oa->o_blksize = cli_brw_size(exp->exp_obd);
154 oa->o_valid |= OBD_MD_FLBLKSZ;
158 ptlrpc_req_finished(req);
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166 struct ptlrpc_request *req;
167 struct ost_body *body;
171 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
173 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
177 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
179 ptlrpc_request_free(req);
183 osc_pack_req_body(req, oa);
185 ptlrpc_request_set_replen(req);
187 rc = ptlrpc_queue_wait(req);
191 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193 GOTO(out, rc = -EPROTO);
195 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
199 ptlrpc_req_finished(req);
204 static int osc_setattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_setattr_args *sa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 GOTO(out, rc = -EPROTO);
218 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221 rc = sa->sa_upcall(sa->sa_cookie, rc);
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226 obd_enqueue_update_f upcall, void *cookie,
227 struct ptlrpc_request_set *rqset)
229 struct ptlrpc_request *req;
230 struct osc_setattr_args *sa;
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oa);
247 ptlrpc_request_set_replen(req);
249 /* do mds to ost setattr asynchronously */
251 /* Do not wait for response. */
252 ptlrpcd_add_req(req);
254 req->rq_interpret_reply =
255 (ptlrpc_interpterer_t)osc_setattr_interpret;
257 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258 sa = ptlrpc_req_async_args(req);
260 sa->sa_upcall = upcall;
261 sa->sa_cookie = cookie;
263 if (rqset == PTLRPCD_SET)
264 ptlrpcd_add_req(req);
266 ptlrpc_set_add_req(rqset, req);
272 static int osc_create(const struct lu_env *env, struct obd_export *exp,
275 struct ptlrpc_request *req;
276 struct ost_body *body;
281 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
282 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
284 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
286 GOTO(out, rc = -ENOMEM);
288 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
290 ptlrpc_request_free(req);
294 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
297 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
299 ptlrpc_request_set_replen(req);
301 rc = ptlrpc_queue_wait(req);
305 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
307 GOTO(out_req, rc = -EPROTO);
309 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
310 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
312 oa->o_blksize = cli_brw_size(exp->exp_obd);
313 oa->o_valid |= OBD_MD_FLBLKSZ;
315 CDEBUG(D_HA, "transno: "LPD64"\n",
316 lustre_msg_get_transno(req->rq_repmsg));
318 ptlrpc_req_finished(req);
323 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
324 obd_enqueue_update_f upcall, void *cookie,
325 struct ptlrpc_request_set *rqset)
327 struct ptlrpc_request *req;
328 struct osc_setattr_args *sa;
329 struct ost_body *body;
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
337 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
339 ptlrpc_request_free(req);
342 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
343 ptlrpc_at_set_req_timeout(req);
345 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
347 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
349 ptlrpc_request_set_replen(req);
351 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
352 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
353 sa = ptlrpc_req_async_args(req);
355 sa->sa_upcall = upcall;
356 sa->sa_cookie = cookie;
357 if (rqset == PTLRPCD_SET)
358 ptlrpcd_add_req(req);
360 ptlrpc_set_add_req(rqset, req);
365 static int osc_sync_interpret(const struct lu_env *env,
366 struct ptlrpc_request *req,
369 struct osc_fsync_args *fa = arg;
370 struct ost_body *body;
371 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
372 unsigned long valid = 0;
373 struct cl_object *obj;
379 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
381 CERROR("can't unpack ost_body\n");
382 GOTO(out, rc = -EPROTO);
385 *fa->fa_oa = body->oa;
386 obj = osc2cl(fa->fa_obj);
388 /* Update osc object's blocks attribute */
389 cl_object_attr_lock(obj);
390 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
391 attr->cat_blocks = body->oa.o_blocks;
396 cl_object_attr_update(env, obj, attr, valid);
397 cl_object_attr_unlock(obj);
400 rc = fa->fa_upcall(fa->fa_cookie, rc);
404 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
405 obd_enqueue_update_f upcall, void *cookie,
406 struct ptlrpc_request_set *rqset)
408 struct obd_export *exp = osc_export(obj);
409 struct ptlrpc_request *req;
410 struct ost_body *body;
411 struct osc_fsync_args *fa;
415 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
419 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
421 ptlrpc_request_free(req);
425 /* overload the size and blocks fields in the oa with start/end */
426 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
428 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
430 ptlrpc_request_set_replen(req);
431 req->rq_interpret_reply = osc_sync_interpret;
433 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
434 fa = ptlrpc_req_async_args(req);
437 fa->fa_upcall = upcall;
438 fa->fa_cookie = cookie;
440 if (rqset == PTLRPCD_SET)
441 ptlrpcd_add_req(req);
443 ptlrpc_set_add_req(rqset, req);
448 /* Find and cancel locally locks matched by @mode in the resource found by
449 * @objid. Found locks are added into @cancel list. Returns the amount of
450 * locks added to @cancels list. */
451 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
452 struct list_head *cancels,
453 enum ldlm_mode mode, __u64 lock_flags)
455 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
456 struct ldlm_res_id res_id;
457 struct ldlm_resource *res;
461 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
462 * export) but disabled through procfs (flag in NS).
464 * This distinguishes from a case when ELC is not supported originally,
465 * when we still want to cancel locks in advance and just cancel them
466 * locally, without sending any RPC. */
467 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
470 ostid_build_res_name(&oa->o_oi, &res_id);
471 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
475 LDLM_RESOURCE_ADDREF(res);
476 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
477 lock_flags, 0, NULL);
478 LDLM_RESOURCE_DELREF(res);
479 ldlm_resource_putref(res);
483 static int osc_destroy_interpret(const struct lu_env *env,
484 struct ptlrpc_request *req, void *data,
487 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
489 atomic_dec(&cli->cl_destroy_in_flight);
490 wake_up(&cli->cl_destroy_waitq);
494 static int osc_can_send_destroy(struct client_obd *cli)
496 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
497 cli->cl_max_rpcs_in_flight) {
498 /* The destroy request can be sent */
501 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
502 cli->cl_max_rpcs_in_flight) {
504 * The counter has been modified between the two atomic
507 wake_up(&cli->cl_destroy_waitq);
512 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
515 struct client_obd *cli = &exp->exp_obd->u.cli;
516 struct ptlrpc_request *req;
517 struct ost_body *body;
518 struct list_head cancels = LIST_HEAD_INIT(cancels);
523 CDEBUG(D_INFO, "oa NULL\n");
527 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
528 LDLM_FL_DISCARD_DATA);
530 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
532 ldlm_lock_list_put(&cancels, l_bl_ast, count);
536 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
539 ptlrpc_request_free(req);
543 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
544 ptlrpc_at_set_req_timeout(req);
546 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
550 ptlrpc_request_set_replen(req);
552 req->rq_interpret_reply = osc_destroy_interpret;
553 if (!osc_can_send_destroy(cli)) {
554 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
557 * Wait until the number of on-going destroy RPCs drops
558 * under max_rpc_in_flight
560 l_wait_event_exclusive(cli->cl_destroy_waitq,
561 osc_can_send_destroy(cli), &lwi);
564 /* Do not wait for response */
565 ptlrpcd_add_req(req);
569 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
572 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
574 LASSERT(!(oa->o_valid & bits));
577 spin_lock(&cli->cl_loi_list_lock);
578 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
579 oa->o_dirty = cli->cl_dirty_grant;
581 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
582 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
583 cli->cl_dirty_max_pages)) {
584 CERROR("dirty %lu - %lu > dirty_max %lu\n",
585 cli->cl_dirty_pages, cli->cl_dirty_transit,
586 cli->cl_dirty_max_pages);
588 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
589 atomic_long_read(&obd_dirty_transit_pages) >
590 (long)(obd_max_dirty_pages + 1))) {
591 /* The atomic_read() allowing the atomic_inc() are
592 * not covered by a lock thus they may safely race and trip
593 * this CERROR() unless we add in a small fudge factor (+1). */
594 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
595 cli_name(cli), atomic_long_read(&obd_dirty_pages),
596 atomic_long_read(&obd_dirty_transit_pages),
597 obd_max_dirty_pages);
599 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
601 CERROR("dirty %lu - dirty_max %lu too big???\n",
602 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
605 unsigned long nrpages;
607 nrpages = cli->cl_max_pages_per_rpc;
608 nrpages *= cli->cl_max_rpcs_in_flight + 1;
609 nrpages = max(nrpages, cli->cl_dirty_max_pages);
610 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
611 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
615 /* take extent tax into account when asking for more
617 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
618 cli->cl_max_extent_pages;
619 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
622 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
623 oa->o_dropped = cli->cl_lost_grant;
624 cli->cl_lost_grant = 0;
625 spin_unlock(&cli->cl_loi_list_lock);
626 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
627 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
630 void osc_update_next_shrink(struct client_obd *cli)
632 cli->cl_next_shrink_grant =
633 cfs_time_shift(cli->cl_grant_shrink_interval);
634 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
635 cli->cl_next_shrink_grant);
638 static void __osc_update_grant(struct client_obd *cli, u64 grant)
640 spin_lock(&cli->cl_loi_list_lock);
641 cli->cl_avail_grant += grant;
642 spin_unlock(&cli->cl_loi_list_lock);
645 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
647 if (body->oa.o_valid & OBD_MD_FLGRANT) {
648 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
649 __osc_update_grant(cli, body->oa.o_grant);
653 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
654 u32 keylen, void *key,
655 u32 vallen, void *val,
656 struct ptlrpc_request_set *set);
658 static int osc_shrink_grant_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
664 struct ost_body *body;
667 __osc_update_grant(cli, oa->o_grant);
671 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
673 osc_update_grant(cli, body);
679 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
681 spin_lock(&cli->cl_loi_list_lock);
682 oa->o_grant = cli->cl_avail_grant / 4;
683 cli->cl_avail_grant -= oa->o_grant;
684 spin_unlock(&cli->cl_loi_list_lock);
685 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
686 oa->o_valid |= OBD_MD_FLFLAGS;
689 oa->o_flags |= OBD_FL_SHRINK_GRANT;
690 osc_update_next_shrink(cli);
693 /* Shrink the current grant, either from some large amount to enough for a
694 * full set of in-flight RPCs, or if we have already shrunk to that limit
695 * then to enough for a single RPC. This avoids keeping more grant than
696 * needed, and avoids shrinking the grant piecemeal. */
697 static int osc_shrink_grant(struct client_obd *cli)
699 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
700 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
702 spin_lock(&cli->cl_loi_list_lock);
703 if (cli->cl_avail_grant <= target_bytes)
704 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
705 spin_unlock(&cli->cl_loi_list_lock);
707 return osc_shrink_grant_to_target(cli, target_bytes);
710 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
713 struct ost_body *body;
716 spin_lock(&cli->cl_loi_list_lock);
717 /* Don't shrink if we are already above or below the desired limit
718 * We don't want to shrink below a single RPC, as that will negatively
719 * impact block allocation and long-term performance. */
720 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
721 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
723 if (target_bytes >= cli->cl_avail_grant) {
724 spin_unlock(&cli->cl_loi_list_lock);
727 spin_unlock(&cli->cl_loi_list_lock);
733 osc_announce_cached(cli, &body->oa, 0);
735 spin_lock(&cli->cl_loi_list_lock);
736 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
737 cli->cl_avail_grant = target_bytes;
738 spin_unlock(&cli->cl_loi_list_lock);
739 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
740 body->oa.o_valid |= OBD_MD_FLFLAGS;
741 body->oa.o_flags = 0;
743 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
744 osc_update_next_shrink(cli);
746 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
747 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
748 sizeof(*body), body, NULL);
750 __osc_update_grant(cli, body->oa.o_grant);
755 static int osc_should_shrink_grant(struct client_obd *client)
757 cfs_time_t time = cfs_time_current();
758 cfs_time_t next_shrink = client->cl_next_shrink_grant;
760 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
761 OBD_CONNECT_GRANT_SHRINK) == 0)
764 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
765 /* Get the current RPC size directly, instead of going via:
766 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
767 * Keep comment here so that it can be found by searching. */
768 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
770 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
771 client->cl_avail_grant > brw_size)
774 osc_update_next_shrink(client);
779 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
781 struct client_obd *client;
783 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
784 if (osc_should_shrink_grant(client))
785 osc_shrink_grant(client);
790 static int osc_add_shrink_grant(struct client_obd *client)
794 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
796 osc_grant_shrink_grant_cb, NULL,
797 &client->cl_grant_shrink_list);
799 CERROR("add grant client %s error %d\n", cli_name(client), rc);
802 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
803 osc_update_next_shrink(client);
807 static int osc_del_shrink_grant(struct client_obd *client)
809 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
813 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
816 * ocd_grant is the total grant amount we're expect to hold: if we've
817 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
818 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
821 * race is tolerable here: if we're evicted, but imp_state already
822 * left EVICTED state, then cl_dirty_pages must be 0 already.
824 spin_lock(&cli->cl_loi_list_lock);
825 cli->cl_avail_grant = ocd->ocd_grant;
826 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
827 cli->cl_avail_grant -= cli->cl_reserved_grant;
828 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
829 cli->cl_avail_grant -= cli->cl_dirty_grant;
831 cli->cl_avail_grant -=
832 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
835 if (cli->cl_avail_grant < 0) {
836 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
837 cli_name(cli), cli->cl_avail_grant,
838 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
839 /* workaround for servers which do not have the patch from
841 cli->cl_avail_grant = ocd->ocd_grant;
844 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
847 /* overhead for each extent insertion */
848 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
849 /* determine the appropriate chunk size used by osc_extent. */
850 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
851 ocd->ocd_grant_blkbits);
852 /* determine maximum extent size, in #pages */
853 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
854 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
855 if (cli->cl_max_extent_pages == 0)
856 cli->cl_max_extent_pages = 1;
858 cli->cl_grant_extent_tax = 0;
859 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
860 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
862 spin_unlock(&cli->cl_loi_list_lock);
864 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
865 "chunk bits: %d cl_max_extent_pages: %d\n",
867 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
868 cli->cl_max_extent_pages);
870 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
871 list_empty(&cli->cl_grant_shrink_list))
872 osc_add_shrink_grant(cli);
875 /* We assume that the reason this OSC got a short read is because it read
876 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
877 * via the LOV, and it _knows_ it's reading inside the file, it's just that
878 * this stripe never got written at or beyond this stripe offset yet. */
879 static void handle_short_read(int nob_read, size_t page_count,
880 struct brw_page **pga)
885 /* skip bytes read OK */
886 while (nob_read > 0) {
887 LASSERT (page_count > 0);
889 if (pga[i]->count > nob_read) {
890 /* EOF inside this page */
891 ptr = kmap(pga[i]->pg) +
892 (pga[i]->off & ~PAGE_MASK);
893 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
900 nob_read -= pga[i]->count;
905 /* zero remaining pages */
906 while (page_count-- > 0) {
907 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
908 memset(ptr, 0, pga[i]->count);
914 static int check_write_rcs(struct ptlrpc_request *req,
915 int requested_nob, int niocount,
916 size_t page_count, struct brw_page **pga)
921 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
922 sizeof(*remote_rcs) *
924 if (remote_rcs == NULL) {
925 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
929 /* return error if any niobuf was in error */
930 for (i = 0; i < niocount; i++) {
931 if ((int)remote_rcs[i] < 0)
932 return(remote_rcs[i]);
934 if (remote_rcs[i] != 0) {
935 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
936 i, remote_rcs[i], req);
941 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
942 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
943 req->rq_bulk->bd_nob_transferred, requested_nob);
950 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
952 if (p1->flag != p2->flag) {
953 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
954 OBD_BRW_SYNC | OBD_BRW_ASYNC |
955 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
957 /* warn if we try to combine flags that we don't know to be
959 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
960 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
961 "report this at https://jira.hpdd.intel.com/\n",
967 return (p1->off + p1->count == p2->off);
970 static u32 osc_checksum_bulk(int nob, size_t pg_count,
971 struct brw_page **pga, int opc,
972 cksum_type_t cksum_type)
976 struct cfs_crypto_hash_desc *hdesc;
977 unsigned int bufsize;
979 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
981 LASSERT(pg_count > 0);
983 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
985 CERROR("Unable to initialize checksum hash %s\n",
986 cfs_crypto_hash_name(cfs_alg));
987 return PTR_ERR(hdesc);
990 while (nob > 0 && pg_count > 0) {
991 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
993 /* corrupt the data before we compute the checksum, to
994 * simulate an OST->client data error */
995 if (i == 0 && opc == OST_READ &&
996 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
997 unsigned char *ptr = kmap(pga[i]->pg);
998 int off = pga[i]->off & ~PAGE_MASK;
1000 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1003 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1004 pga[i]->off & ~PAGE_MASK,
1006 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1007 (int)(pga[i]->off & ~PAGE_MASK));
1009 nob -= pga[i]->count;
1014 bufsize = sizeof(cksum);
1015 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1017 /* For sending we only compute the wrong checksum instead
1018 * of corrupting the data so it is still correct on a redo */
1019 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1026 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1027 u32 page_count, struct brw_page **pga,
1028 struct ptlrpc_request **reqp, int resend)
1030 struct ptlrpc_request *req;
1031 struct ptlrpc_bulk_desc *desc;
1032 struct ost_body *body;
1033 struct obd_ioobj *ioobj;
1034 struct niobuf_remote *niobuf;
1035 int niocount, i, requested_nob, opc, rc;
1036 struct osc_brw_async_args *aa;
1037 struct req_capsule *pill;
1038 struct brw_page *pg_prev;
1041 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1042 RETURN(-ENOMEM); /* Recoverable */
1043 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1044 RETURN(-EINVAL); /* Fatal */
1046 if ((cmd & OBD_BRW_WRITE) != 0) {
1048 req = ptlrpc_request_alloc_pool(cli->cl_import,
1050 &RQF_OST_BRW_WRITE);
1053 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1058 for (niocount = i = 1; i < page_count; i++) {
1059 if (!can_merge_pages(pga[i - 1], pga[i]))
1063 pill = &req->rq_pill;
1064 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1066 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1067 niocount * sizeof(*niobuf));
1069 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1071 ptlrpc_request_free(req);
1074 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1075 ptlrpc_at_set_req_timeout(req);
1076 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1078 req->rq_no_retry_einprogress = 1;
1080 desc = ptlrpc_prep_bulk_imp(req, page_count,
1081 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1082 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1083 PTLRPC_BULK_PUT_SINK) |
1084 PTLRPC_BULK_BUF_KIOV,
1086 &ptlrpc_bulk_kiov_pin_ops);
1089 GOTO(out, rc = -ENOMEM);
1090 /* NB request now owns desc and will free it when it gets freed */
1092 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1093 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1094 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1095 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1097 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1099 obdo_to_ioobj(oa, ioobj);
1100 ioobj->ioo_bufcnt = niocount;
1101 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1102 * that might be send for this request. The actual number is decided
1103 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1104 * "max - 1" for old client compatibility sending "0", and also so the
1105 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1106 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1107 LASSERT(page_count > 0);
1109 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1110 struct brw_page *pg = pga[i];
1111 int poff = pg->off & ~PAGE_MASK;
1113 LASSERT(pg->count > 0);
1114 /* make sure there is no gap in the middle of page array */
1115 LASSERTF(page_count == 1 ||
1116 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1117 ergo(i > 0 && i < page_count - 1,
1118 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1119 ergo(i == page_count - 1, poff == 0)),
1120 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1121 i, page_count, pg, pg->off, pg->count);
1122 LASSERTF(i == 0 || pg->off > pg_prev->off,
1123 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1124 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1126 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1127 pg_prev->pg, page_private(pg_prev->pg),
1128 pg_prev->pg->index, pg_prev->off);
1129 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1130 (pg->flag & OBD_BRW_SRVLOCK));
1132 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1133 requested_nob += pg->count;
1135 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1137 niobuf->rnb_len += pg->count;
1139 niobuf->rnb_offset = pg->off;
1140 niobuf->rnb_len = pg->count;
1141 niobuf->rnb_flags = pg->flag;
1146 LASSERTF((void *)(niobuf - niocount) ==
1147 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1148 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1149 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1151 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1153 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1154 body->oa.o_valid |= OBD_MD_FLFLAGS;
1155 body->oa.o_flags = 0;
1157 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1160 if (osc_should_shrink_grant(cli))
1161 osc_shrink_grant_local(cli, &body->oa);
1163 /* size[REQ_REC_OFF] still sizeof (*body) */
1164 if (opc == OST_WRITE) {
1165 if (cli->cl_checksum &&
1166 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1167 /* store cl_cksum_type in a local variable since
1168 * it can be changed via lprocfs */
1169 cksum_type_t cksum_type = cli->cl_cksum_type;
1171 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1172 oa->o_flags &= OBD_FL_LOCAL_MASK;
1173 body->oa.o_flags = 0;
1175 body->oa.o_flags |= cksum_type_pack(cksum_type);
1176 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1177 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1181 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1183 /* save this in 'oa', too, for later checking */
1184 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1185 oa->o_flags |= cksum_type_pack(cksum_type);
1187 /* clear out the checksum flag, in case this is a
1188 * resend but cl_checksum is no longer set. b=11238 */
1189 oa->o_valid &= ~OBD_MD_FLCKSUM;
1191 oa->o_cksum = body->oa.o_cksum;
1192 /* 1 RC per niobuf */
1193 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1194 sizeof(__u32) * niocount);
1196 if (cli->cl_checksum &&
1197 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1198 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1199 body->oa.o_flags = 0;
1200 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1201 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1204 ptlrpc_request_set_replen(req);
1206 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1207 aa = ptlrpc_req_async_args(req);
1209 aa->aa_requested_nob = requested_nob;
1210 aa->aa_nio_count = niocount;
1211 aa->aa_page_count = page_count;
1215 INIT_LIST_HEAD(&aa->aa_oaps);
1218 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1219 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1220 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1221 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1225 ptlrpc_req_finished(req);
1229 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1230 __u32 client_cksum, __u32 server_cksum, int nob,
1231 size_t page_count, struct brw_page **pga,
1232 cksum_type_t client_cksum_type)
1236 cksum_type_t cksum_type;
1238 if (server_cksum == client_cksum) {
1239 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1243 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1245 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1248 if (cksum_type != client_cksum_type)
1249 msg = "the server did not use the checksum type specified in "
1250 "the original request - likely a protocol problem";
1251 else if (new_cksum == server_cksum)
1252 msg = "changed on the client after we checksummed it - "
1253 "likely false positive due to mmap IO (bug 11742)";
1254 else if (new_cksum == client_cksum)
1255 msg = "changed in transit before arrival at OST";
1257 msg = "changed in transit AND doesn't match the original - "
1258 "likely false positive due to mmap IO (bug 11742)";
1260 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1261 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1262 msg, libcfs_nid2str(peer->nid),
1263 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1264 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1265 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1266 POSTID(&oa->o_oi), pga[0]->off,
1267 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1268 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1269 "client csum now %x\n", client_cksum, client_cksum_type,
1270 server_cksum, cksum_type, new_cksum);
1274 /* Note rc enters this function as number of bytes transferred */
1275 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1277 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1278 const lnet_process_id_t *peer =
1279 &req->rq_import->imp_connection->c_peer;
1280 struct client_obd *cli = aa->aa_cli;
1281 struct ost_body *body;
1282 u32 client_cksum = 0;
1285 if (rc < 0 && rc != -EDQUOT) {
1286 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1290 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1291 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1293 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1297 /* set/clear over quota flag for a uid/gid */
1298 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1299 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1300 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1302 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1303 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1305 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1308 osc_update_grant(cli, body);
1313 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1314 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1316 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1318 CERROR("Unexpected +ve rc %d\n", rc);
1321 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1323 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1326 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1327 check_write_checksum(&body->oa, peer, client_cksum,
1328 body->oa.o_cksum, aa->aa_requested_nob,
1329 aa->aa_page_count, aa->aa_ppga,
1330 cksum_type_unpack(aa->aa_oa->o_flags)))
1333 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1334 aa->aa_page_count, aa->aa_ppga);
1338 /* The rest of this function executes only for OST_READs */
1340 /* if unwrap_bulk failed, return -EAGAIN to retry */
1341 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1343 GOTO(out, rc = -EAGAIN);
1345 if (rc > aa->aa_requested_nob) {
1346 CERROR("Unexpected rc %d (%d requested)\n", rc,
1347 aa->aa_requested_nob);
1351 if (rc != req->rq_bulk->bd_nob_transferred) {
1352 CERROR ("Unexpected rc %d (%d transferred)\n",
1353 rc, req->rq_bulk->bd_nob_transferred);
1357 if (rc < aa->aa_requested_nob)
1358 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1360 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1361 static int cksum_counter;
1362 u32 server_cksum = body->oa.o_cksum;
1365 cksum_type_t cksum_type;
1367 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1368 body->oa.o_flags : 0);
1369 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1370 aa->aa_ppga, OST_READ,
1373 if (peer->nid != req->rq_bulk->bd_sender) {
1375 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1378 if (server_cksum != client_cksum) {
1379 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1380 "%s%s%s inode "DFID" object "DOSTID
1381 " extent ["LPU64"-"LPU64"]\n",
1382 req->rq_import->imp_obd->obd_name,
1383 libcfs_nid2str(peer->nid),
1385 body->oa.o_valid & OBD_MD_FLFID ?
1386 body->oa.o_parent_seq : (__u64)0,
1387 body->oa.o_valid & OBD_MD_FLFID ?
1388 body->oa.o_parent_oid : 0,
1389 body->oa.o_valid & OBD_MD_FLFID ?
1390 body->oa.o_parent_ver : 0,
1391 POSTID(&body->oa.o_oi),
1392 aa->aa_ppga[0]->off,
1393 aa->aa_ppga[aa->aa_page_count-1]->off +
1394 aa->aa_ppga[aa->aa_page_count-1]->count -
1396 CERROR("client %x, server %x, cksum_type %x\n",
1397 client_cksum, server_cksum, cksum_type);
1399 aa->aa_oa->o_cksum = client_cksum;
1403 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1406 } else if (unlikely(client_cksum)) {
1407 static int cksum_missed;
1410 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1411 CERROR("Checksum %u requested from %s but not sent\n",
1412 cksum_missed, libcfs_nid2str(peer->nid));
1418 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1419 aa->aa_oa, &body->oa);
1424 static int osc_brw_redo_request(struct ptlrpc_request *request,
1425 struct osc_brw_async_args *aa, int rc)
1427 struct ptlrpc_request *new_req;
1428 struct osc_brw_async_args *new_aa;
1429 struct osc_async_page *oap;
1432 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1433 "redo for recoverable error %d", rc);
1435 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1436 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1437 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1438 aa->aa_ppga, &new_req, 1);
1442 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1443 if (oap->oap_request != NULL) {
1444 LASSERTF(request == oap->oap_request,
1445 "request %p != oap_request %p\n",
1446 request, oap->oap_request);
1447 if (oap->oap_interrupted) {
1448 ptlrpc_req_finished(new_req);
1453 /* New request takes over pga and oaps from old request.
1454 * Note that copying a list_head doesn't work, need to move it... */
1456 new_req->rq_interpret_reply = request->rq_interpret_reply;
1457 new_req->rq_async_args = request->rq_async_args;
1458 new_req->rq_commit_cb = request->rq_commit_cb;
1459 /* cap resend delay to the current request timeout, this is similar to
1460 * what ptlrpc does (see after_reply()) */
1461 if (aa->aa_resends > new_req->rq_timeout)
1462 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1464 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1465 new_req->rq_generation_set = 1;
1466 new_req->rq_import_generation = request->rq_import_generation;
1468 new_aa = ptlrpc_req_async_args(new_req);
1470 INIT_LIST_HEAD(&new_aa->aa_oaps);
1471 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1472 INIT_LIST_HEAD(&new_aa->aa_exts);
1473 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1474 new_aa->aa_resends = aa->aa_resends;
1476 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1477 if (oap->oap_request) {
1478 ptlrpc_req_finished(oap->oap_request);
1479 oap->oap_request = ptlrpc_request_addref(new_req);
1483 /* XXX: This code will run into problem if we're going to support
1484 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1485 * and wait for all of them to be finished. We should inherit request
1486 * set from old request. */
1487 ptlrpcd_add_req(new_req);
1489 DEBUG_REQ(D_INFO, new_req, "new request");
1494 * ugh, we want disk allocation on the target to happen in offset order. we'll
1495 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1496 * fine for our small page arrays and doesn't require allocation. its an
1497 * insertion sort that swaps elements that are strides apart, shrinking the
1498 * stride down until its '1' and the array is sorted.
1500 static void sort_brw_pages(struct brw_page **array, int num)
1503 struct brw_page *tmp;
1507 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1512 for (i = stride ; i < num ; i++) {
1515 while (j >= stride && array[j - stride]->off > tmp->off) {
1516 array[j] = array[j - stride];
1521 } while (stride > 1);
1524 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1526 LASSERT(ppga != NULL);
1527 OBD_FREE(ppga, sizeof(*ppga) * count);
1530 static int brw_interpret(const struct lu_env *env,
1531 struct ptlrpc_request *req, void *data, int rc)
1533 struct osc_brw_async_args *aa = data;
1534 struct osc_extent *ext;
1535 struct osc_extent *tmp;
1536 struct client_obd *cli = aa->aa_cli;
1539 rc = osc_brw_fini_request(req, rc);
1540 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1541 /* When server return -EINPROGRESS, client should always retry
1542 * regardless of the number of times the bulk was resent already. */
1543 if (osc_recoverable_error(rc)) {
1544 if (req->rq_import_generation !=
1545 req->rq_import->imp_generation) {
1546 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1547 ""DOSTID", rc = %d.\n",
1548 req->rq_import->imp_obd->obd_name,
1549 POSTID(&aa->aa_oa->o_oi), rc);
1550 } else if (rc == -EINPROGRESS ||
1551 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1552 rc = osc_brw_redo_request(req, aa, rc);
1554 CERROR("%s: too many resent retries for object: "
1555 ""LPU64":"LPU64", rc = %d.\n",
1556 req->rq_import->imp_obd->obd_name,
1557 POSTID(&aa->aa_oa->o_oi), rc);
1562 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1567 struct obdo *oa = aa->aa_oa;
1568 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1569 unsigned long valid = 0;
1570 struct cl_object *obj;
1571 struct osc_async_page *last;
1573 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1574 obj = osc2cl(last->oap_obj);
1576 cl_object_attr_lock(obj);
1577 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1578 attr->cat_blocks = oa->o_blocks;
1579 valid |= CAT_BLOCKS;
1581 if (oa->o_valid & OBD_MD_FLMTIME) {
1582 attr->cat_mtime = oa->o_mtime;
1585 if (oa->o_valid & OBD_MD_FLATIME) {
1586 attr->cat_atime = oa->o_atime;
1589 if (oa->o_valid & OBD_MD_FLCTIME) {
1590 attr->cat_ctime = oa->o_ctime;
1594 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1595 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1596 loff_t last_off = last->oap_count + last->oap_obj_off +
1599 /* Change file size if this is an out of quota or
1600 * direct IO write and it extends the file size */
1601 if (loi->loi_lvb.lvb_size < last_off) {
1602 attr->cat_size = last_off;
1605 /* Extend KMS if it's not a lockless write */
1606 if (loi->loi_kms < last_off &&
1607 oap2osc_page(last)->ops_srvlock == 0) {
1608 attr->cat_kms = last_off;
1614 cl_object_attr_update(env, obj, attr, valid);
1615 cl_object_attr_unlock(obj);
1617 OBDO_FREE(aa->aa_oa);
1619 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1620 osc_inc_unstable_pages(req);
1622 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1623 list_del_init(&ext->oe_link);
1624 osc_extent_finish(env, ext, 1, rc);
1626 LASSERT(list_empty(&aa->aa_exts));
1627 LASSERT(list_empty(&aa->aa_oaps));
1629 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1630 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1632 spin_lock(&cli->cl_loi_list_lock);
1633 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1634 * is called so we know whether to go to sync BRWs or wait for more
1635 * RPCs to complete */
1636 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1637 cli->cl_w_in_flight--;
1639 cli->cl_r_in_flight--;
1640 osc_wake_cache_waiters(cli);
1641 spin_unlock(&cli->cl_loi_list_lock);
1643 osc_io_unplug(env, cli, NULL);
1647 static void brw_commit(struct ptlrpc_request *req)
1649 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1650 * this called via the rq_commit_cb, I need to ensure
1651 * osc_dec_unstable_pages is still called. Otherwise unstable
1652 * pages may be leaked. */
1653 spin_lock(&req->rq_lock);
1654 if (likely(req->rq_unstable)) {
1655 req->rq_unstable = 0;
1656 spin_unlock(&req->rq_lock);
1658 osc_dec_unstable_pages(req);
1660 req->rq_committed = 1;
1661 spin_unlock(&req->rq_lock);
1666 * Build an RPC by the list of extent @ext_list. The caller must ensure
1667 * that the total pages in this list are NOT over max pages per RPC.
1668 * Extents in the list must be in OES_RPC state.
1670 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1671 struct list_head *ext_list, int cmd)
1673 struct ptlrpc_request *req = NULL;
1674 struct osc_extent *ext;
1675 struct brw_page **pga = NULL;
1676 struct osc_brw_async_args *aa = NULL;
1677 struct obdo *oa = NULL;
1678 struct osc_async_page *oap;
1679 struct osc_object *obj = NULL;
1680 struct cl_req_attr *crattr = NULL;
1681 loff_t starting_offset = OBD_OBJECT_EOF;
1682 loff_t ending_offset = 0;
1686 bool soft_sync = false;
1687 bool interrupted = false;
1691 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1692 struct ost_body *body;
1694 LASSERT(!list_empty(ext_list));
1696 /* add pages into rpc_list to build BRW rpc */
1697 list_for_each_entry(ext, ext_list, oe_link) {
1698 LASSERT(ext->oe_state == OES_RPC);
1699 mem_tight |= ext->oe_memalloc;
1700 grant += ext->oe_grants;
1701 page_count += ext->oe_nr_pages;
1706 soft_sync = osc_over_unstable_soft_limit(cli);
1708 mpflag = cfs_memory_pressure_get_and_set();
1710 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1712 GOTO(out, rc = -ENOMEM);
1716 GOTO(out, rc = -ENOMEM);
1719 list_for_each_entry(ext, ext_list, oe_link) {
1720 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1722 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1724 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1725 pga[i] = &oap->oap_brw_page;
1726 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1729 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1730 if (starting_offset == OBD_OBJECT_EOF ||
1731 starting_offset > oap->oap_obj_off)
1732 starting_offset = oap->oap_obj_off;
1734 LASSERT(oap->oap_page_off == 0);
1735 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1736 ending_offset = oap->oap_obj_off +
1739 LASSERT(oap->oap_page_off + oap->oap_count ==
1741 if (oap->oap_interrupted)
1746 /* first page in the list */
1747 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1749 crattr = &osc_env_info(env)->oti_req_attr;
1750 memset(crattr, 0, sizeof(*crattr));
1751 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1752 crattr->cra_flags = ~0ULL;
1753 crattr->cra_page = oap2cl_page(oap);
1754 crattr->cra_oa = oa;
1755 cl_req_attr_set(env, osc2cl(obj), crattr);
1757 if (cmd == OBD_BRW_WRITE)
1758 oa->o_grant_used = grant;
1760 sort_brw_pages(pga, page_count);
1761 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1763 CERROR("prep_req failed: %d\n", rc);
1767 req->rq_commit_cb = brw_commit;
1768 req->rq_interpret_reply = brw_interpret;
1769 req->rq_memalloc = mem_tight != 0;
1770 oap->oap_request = ptlrpc_request_addref(req);
1771 if (interrupted && !req->rq_intr)
1772 ptlrpc_mark_interrupted(req);
1774 /* Need to update the timestamps after the request is built in case
1775 * we race with setattr (locally or in queue at OST). If OST gets
1776 * later setattr before earlier BRW (as determined by the request xid),
1777 * the OST will not use BRW timestamps. Sadly, there is no obvious
1778 * way to do this in a single call. bug 10150 */
1779 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1780 crattr->cra_oa = &body->oa;
1781 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1782 cl_req_attr_set(env, osc2cl(obj), crattr);
1783 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1785 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1786 aa = ptlrpc_req_async_args(req);
1787 INIT_LIST_HEAD(&aa->aa_oaps);
1788 list_splice_init(&rpc_list, &aa->aa_oaps);
1789 INIT_LIST_HEAD(&aa->aa_exts);
1790 list_splice_init(ext_list, &aa->aa_exts);
1792 spin_lock(&cli->cl_loi_list_lock);
1793 starting_offset >>= PAGE_CACHE_SHIFT;
1794 if (cmd == OBD_BRW_READ) {
1795 cli->cl_r_in_flight++;
1796 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1797 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1798 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1799 starting_offset + 1);
1801 cli->cl_w_in_flight++;
1802 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1803 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1804 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1805 starting_offset + 1);
1807 spin_unlock(&cli->cl_loi_list_lock);
1809 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1810 page_count, aa, cli->cl_r_in_flight,
1811 cli->cl_w_in_flight);
1813 ptlrpcd_add_req(req);
1819 cfs_memory_pressure_restore(mpflag);
1822 LASSERT(req == NULL);
1827 OBD_FREE(pga, sizeof(*pga) * page_count);
1828 /* this should happen rarely and is pretty bad, it makes the
1829 * pending list not follow the dirty order */
1830 while (!list_empty(ext_list)) {
1831 ext = list_entry(ext_list->next, struct osc_extent,
1833 list_del_init(&ext->oe_link);
1834 osc_extent_finish(env, ext, 0, rc);
1840 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1841 struct ldlm_enqueue_info *einfo)
1843 void *data = einfo->ei_cbdata;
1846 LASSERT(lock != NULL);
1847 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1848 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1849 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1850 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1852 lock_res_and_lock(lock);
1854 if (lock->l_ast_data == NULL)
1855 lock->l_ast_data = data;
1856 if (lock->l_ast_data == data)
1859 unlock_res_and_lock(lock);
1864 static int osc_set_data_with_check(struct lustre_handle *lockh,
1865 struct ldlm_enqueue_info *einfo)
1867 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1871 set = osc_set_lock_data_with_check(lock, einfo);
1872 LDLM_LOCK_PUT(lock);
1874 CERROR("lockh %p, data %p - client evicted?\n",
1875 lockh, einfo->ei_cbdata);
1879 static int osc_enqueue_fini(struct ptlrpc_request *req,
1880 osc_enqueue_upcall_f upcall, void *cookie,
1881 struct lustre_handle *lockh, enum ldlm_mode mode,
1882 __u64 *flags, int agl, int errcode)
1884 bool intent = *flags & LDLM_FL_HAS_INTENT;
1888 /* The request was created before ldlm_cli_enqueue call. */
1889 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1890 struct ldlm_reply *rep;
1892 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1893 LASSERT(rep != NULL);
1895 rep->lock_policy_res1 =
1896 ptlrpc_status_ntoh(rep->lock_policy_res1);
1897 if (rep->lock_policy_res1)
1898 errcode = rep->lock_policy_res1;
1900 *flags |= LDLM_FL_LVB_READY;
1901 } else if (errcode == ELDLM_OK) {
1902 *flags |= LDLM_FL_LVB_READY;
1905 /* Call the update callback. */
1906 rc = (*upcall)(cookie, lockh, errcode);
1908 /* release the reference taken in ldlm_cli_enqueue() */
1909 if (errcode == ELDLM_LOCK_MATCHED)
1911 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1912 ldlm_lock_decref(lockh, mode);
1917 static int osc_enqueue_interpret(const struct lu_env *env,
1918 struct ptlrpc_request *req,
1919 struct osc_enqueue_args *aa, int rc)
1921 struct ldlm_lock *lock;
1922 struct lustre_handle *lockh = &aa->oa_lockh;
1923 enum ldlm_mode mode = aa->oa_mode;
1924 struct ost_lvb *lvb = aa->oa_lvb;
1925 __u32 lvb_len = sizeof(*lvb);
1930 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1932 lock = ldlm_handle2lock(lockh);
1933 LASSERTF(lock != NULL,
1934 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1935 lockh->cookie, req, aa);
1937 /* Take an additional reference so that a blocking AST that
1938 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1939 * to arrive after an upcall has been executed by
1940 * osc_enqueue_fini(). */
1941 ldlm_lock_addref(lockh, mode);
1943 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1944 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1946 /* Let CP AST to grant the lock first. */
1947 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1950 LASSERT(aa->oa_lvb == NULL);
1951 LASSERT(aa->oa_flags == NULL);
1952 aa->oa_flags = &flags;
1955 /* Complete obtaining the lock procedure. */
1956 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1957 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1959 /* Complete osc stuff. */
1960 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1961 aa->oa_flags, aa->oa_agl, rc);
1963 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1965 ldlm_lock_decref(lockh, mode);
1966 LDLM_LOCK_PUT(lock);
1970 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1972 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1973 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1974 * other synchronous requests, however keeping some locks and trying to obtain
1975 * others may take a considerable amount of time in a case of ost failure; and
1976 * when other sync requests do not get released lock from a client, the client
1977 * is evicted from the cluster -- such scenarious make the life difficult, so
1978 * release locks just after they are obtained. */
1979 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1980 __u64 *flags, union ldlm_policy_data *policy,
1981 struct ost_lvb *lvb, int kms_valid,
1982 osc_enqueue_upcall_f upcall, void *cookie,
1983 struct ldlm_enqueue_info *einfo,
1984 struct ptlrpc_request_set *rqset, int async, int agl)
1986 struct obd_device *obd = exp->exp_obd;
1987 struct lustre_handle lockh = { 0 };
1988 struct ptlrpc_request *req = NULL;
1989 int intent = *flags & LDLM_FL_HAS_INTENT;
1990 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1991 enum ldlm_mode mode;
1995 /* Filesystem lock extents are extended to page boundaries so that
1996 * dealing with the page cache is a little smoother. */
1997 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1998 policy->l_extent.end |= ~PAGE_MASK;
2001 * kms is not valid when either object is completely fresh (so that no
2002 * locks are cached), or object was evicted. In the latter case cached
2003 * lock cannot be used, because it would prime inode state with
2004 * potentially stale LVB.
2009 /* Next, search for already existing extent locks that will cover us */
2010 /* If we're trying to read, we also search for an existing PW lock. The
2011 * VFS and page cache already protect us locally, so lots of readers/
2012 * writers can share a single PW lock.
2014 * There are problems with conversion deadlocks, so instead of
2015 * converting a read lock to a write lock, we'll just enqueue a new
2018 * At some point we should cancel the read lock instead of making them
2019 * send us a blocking callback, but there are problems with canceling
2020 * locks out from other users right now, too. */
2021 mode = einfo->ei_mode;
2022 if (einfo->ei_mode == LCK_PR)
2024 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2025 einfo->ei_type, policy, mode, &lockh, 0);
2027 struct ldlm_lock *matched;
2029 if (*flags & LDLM_FL_TEST_LOCK)
2032 matched = ldlm_handle2lock(&lockh);
2034 /* AGL enqueues DLM locks speculatively. Therefore if
2035 * it already exists a DLM lock, it wll just inform the
2036 * caller to cancel the AGL process for this stripe. */
2037 ldlm_lock_decref(&lockh, mode);
2038 LDLM_LOCK_PUT(matched);
2040 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2041 *flags |= LDLM_FL_LVB_READY;
2043 /* We already have a lock, and it's referenced. */
2044 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2046 ldlm_lock_decref(&lockh, mode);
2047 LDLM_LOCK_PUT(matched);
2050 ldlm_lock_decref(&lockh, mode);
2051 LDLM_LOCK_PUT(matched);
2056 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2060 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2061 &RQF_LDLM_ENQUEUE_LVB);
2065 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2067 ptlrpc_request_free(req);
2071 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2073 ptlrpc_request_set_replen(req);
2076 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2077 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2079 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2080 sizeof(*lvb), LVB_T_OST, &lockh, async);
2083 struct osc_enqueue_args *aa;
2084 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2085 aa = ptlrpc_req_async_args(req);
2087 aa->oa_mode = einfo->ei_mode;
2088 aa->oa_type = einfo->ei_type;
2089 lustre_handle_copy(&aa->oa_lockh, &lockh);
2090 aa->oa_upcall = upcall;
2091 aa->oa_cookie = cookie;
2094 aa->oa_flags = flags;
2097 /* AGL is essentially to enqueue an DLM lock
2098 * in advance, so we don't care about the
2099 * result of AGL enqueue. */
2101 aa->oa_flags = NULL;
2104 req->rq_interpret_reply =
2105 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2106 if (rqset == PTLRPCD_SET)
2107 ptlrpcd_add_req(req);
2109 ptlrpc_set_add_req(rqset, req);
2110 } else if (intent) {
2111 ptlrpc_req_finished(req);
2116 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2119 ptlrpc_req_finished(req);
2124 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2125 enum ldlm_type type, union ldlm_policy_data *policy,
2126 enum ldlm_mode mode, __u64 *flags, void *data,
2127 struct lustre_handle *lockh, int unref)
2129 struct obd_device *obd = exp->exp_obd;
2130 __u64 lflags = *flags;
2134 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2137 /* Filesystem lock extents are extended to page boundaries so that
2138 * dealing with the page cache is a little smoother */
2139 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2140 policy->l_extent.end |= ~PAGE_MASK;
2142 /* Next, search for already existing extent locks that will cover us */
2143 /* If we're trying to read, we also search for an existing PW lock. The
2144 * VFS and page cache already protect us locally, so lots of readers/
2145 * writers can share a single PW lock. */
2149 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2150 res_id, type, policy, rc, lockh, unref);
2153 if (!osc_set_data_with_check(lockh, data)) {
2154 if (!(lflags & LDLM_FL_TEST_LOCK))
2155 ldlm_lock_decref(lockh, rc);
2159 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2160 ldlm_lock_addref(lockh, LCK_PR);
2161 ldlm_lock_decref(lockh, LCK_PW);
2168 static int osc_statfs_interpret(const struct lu_env *env,
2169 struct ptlrpc_request *req,
2170 struct osc_async_args *aa, int rc)
2172 struct obd_statfs *msfs;
2176 /* The request has in fact never been sent
2177 * due to issues at a higher level (LOV).
2178 * Exit immediately since the caller is
2179 * aware of the problem and takes care
2180 * of the clean up */
2183 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2184 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2190 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2192 GOTO(out, rc = -EPROTO);
2195 *aa->aa_oi->oi_osfs = *msfs;
2197 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2201 static int osc_statfs_async(struct obd_export *exp,
2202 struct obd_info *oinfo, __u64 max_age,
2203 struct ptlrpc_request_set *rqset)
2205 struct obd_device *obd = class_exp2obd(exp);
2206 struct ptlrpc_request *req;
2207 struct osc_async_args *aa;
2211 /* We could possibly pass max_age in the request (as an absolute
2212 * timestamp or a "seconds.usec ago") so the target can avoid doing
2213 * extra calls into the filesystem if that isn't necessary (e.g.
2214 * during mount that would help a bit). Having relative timestamps
2215 * is not so great if request processing is slow, while absolute
2216 * timestamps are not ideal because they need time synchronization. */
2217 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2221 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2223 ptlrpc_request_free(req);
2226 ptlrpc_request_set_replen(req);
2227 req->rq_request_portal = OST_CREATE_PORTAL;
2228 ptlrpc_at_set_req_timeout(req);
2230 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2231 /* procfs requests not want stat in wait for avoid deadlock */
2232 req->rq_no_resend = 1;
2233 req->rq_no_delay = 1;
2236 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2237 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2238 aa = ptlrpc_req_async_args(req);
2241 ptlrpc_set_add_req(rqset, req);
2245 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2246 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2248 struct obd_device *obd = class_exp2obd(exp);
2249 struct obd_statfs *msfs;
2250 struct ptlrpc_request *req;
2251 struct obd_import *imp = NULL;
2255 /*Since the request might also come from lprocfs, so we need
2256 *sync this with client_disconnect_export Bug15684*/
2257 down_read(&obd->u.cli.cl_sem);
2258 if (obd->u.cli.cl_import)
2259 imp = class_import_get(obd->u.cli.cl_import);
2260 up_read(&obd->u.cli.cl_sem);
2264 /* We could possibly pass max_age in the request (as an absolute
2265 * timestamp or a "seconds.usec ago") so the target can avoid doing
2266 * extra calls into the filesystem if that isn't necessary (e.g.
2267 * during mount that would help a bit). Having relative timestamps
2268 * is not so great if request processing is slow, while absolute
2269 * timestamps are not ideal because they need time synchronization. */
2270 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2272 class_import_put(imp);
2277 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2279 ptlrpc_request_free(req);
2282 ptlrpc_request_set_replen(req);
2283 req->rq_request_portal = OST_CREATE_PORTAL;
2284 ptlrpc_at_set_req_timeout(req);
2286 if (flags & OBD_STATFS_NODELAY) {
2287 /* procfs requests not want stat in wait for avoid deadlock */
2288 req->rq_no_resend = 1;
2289 req->rq_no_delay = 1;
2292 rc = ptlrpc_queue_wait(req);
2296 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2298 GOTO(out, rc = -EPROTO);
2305 ptlrpc_req_finished(req);
2309 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2310 void *karg, void __user *uarg)
2312 struct obd_device *obd = exp->exp_obd;
2313 struct obd_ioctl_data *data = karg;
2317 if (!try_module_get(THIS_MODULE)) {
2318 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2319 module_name(THIS_MODULE));
2323 case OBD_IOC_CLIENT_RECOVER:
2324 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2325 data->ioc_inlbuf1, 0);
2329 case IOC_OSC_SET_ACTIVE:
2330 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2333 case OBD_IOC_PING_TARGET:
2334 err = ptlrpc_obd_ping(obd);
2337 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2338 cmd, current_comm());
2339 GOTO(out, err = -ENOTTY);
2342 module_put(THIS_MODULE);
2346 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2347 u32 keylen, void *key,
2348 u32 vallen, void *val,
2349 struct ptlrpc_request_set *set)
2351 struct ptlrpc_request *req;
2352 struct obd_device *obd = exp->exp_obd;
2353 struct obd_import *imp = class_exp2cliimp(exp);
2358 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2360 if (KEY_IS(KEY_CHECKSUM)) {
2361 if (vallen != sizeof(int))
2363 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2367 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2368 sptlrpc_conf_client_adapt(obd);
2372 if (KEY_IS(KEY_FLUSH_CTX)) {
2373 sptlrpc_import_flush_my_ctx(imp);
2377 if (KEY_IS(KEY_CACHE_SET)) {
2378 struct client_obd *cli = &obd->u.cli;
2380 LASSERT(cli->cl_cache == NULL); /* only once */
2381 cli->cl_cache = (struct cl_client_cache *)val;
2382 cl_cache_incref(cli->cl_cache);
2383 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2385 /* add this osc into entity list */
2386 LASSERT(list_empty(&cli->cl_lru_osc));
2387 spin_lock(&cli->cl_cache->ccc_lru_lock);
2388 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2389 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2394 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2395 struct client_obd *cli = &obd->u.cli;
2396 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2397 long target = *(long *)val;
2399 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2404 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2407 /* We pass all other commands directly to OST. Since nobody calls osc
2408 methods directly and everybody is supposed to go through LOV, we
2409 assume lov checked invalid values for us.
2410 The only recognised values so far are evict_by_nid and mds_conn.
2411 Even if something bad goes through, we'd get a -EINVAL from OST
2414 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2415 &RQF_OST_SET_GRANT_INFO :
2420 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2421 RCL_CLIENT, keylen);
2422 if (!KEY_IS(KEY_GRANT_SHRINK))
2423 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2424 RCL_CLIENT, vallen);
2425 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2427 ptlrpc_request_free(req);
2431 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2432 memcpy(tmp, key, keylen);
2433 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2436 memcpy(tmp, val, vallen);
2438 if (KEY_IS(KEY_GRANT_SHRINK)) {
2439 struct osc_grant_args *aa;
2442 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2443 aa = ptlrpc_req_async_args(req);
2446 ptlrpc_req_finished(req);
2449 *oa = ((struct ost_body *)val)->oa;
2451 req->rq_interpret_reply = osc_shrink_grant_interpret;
2454 ptlrpc_request_set_replen(req);
2455 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2456 LASSERT(set != NULL);
2457 ptlrpc_set_add_req(set, req);
2458 ptlrpc_check_set(NULL, set);
2460 ptlrpcd_add_req(req);
2466 static int osc_reconnect(const struct lu_env *env,
2467 struct obd_export *exp, struct obd_device *obd,
2468 struct obd_uuid *cluuid,
2469 struct obd_connect_data *data,
2472 struct client_obd *cli = &obd->u.cli;
2474 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2478 spin_lock(&cli->cl_loi_list_lock);
2479 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2480 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2481 grant += cli->cl_dirty_grant;
2483 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2484 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2485 lost_grant = cli->cl_lost_grant;
2486 cli->cl_lost_grant = 0;
2487 spin_unlock(&cli->cl_loi_list_lock);
2489 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2490 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2491 data->ocd_version, data->ocd_grant, lost_grant);
2497 static int osc_disconnect(struct obd_export *exp)
2499 struct obd_device *obd = class_exp2obd(exp);
2502 rc = client_disconnect_export(exp);
2504 * Initially we put del_shrink_grant before disconnect_export, but it
2505 * causes the following problem if setup (connect) and cleanup
2506 * (disconnect) are tangled together.
2507 * connect p1 disconnect p2
2508 * ptlrpc_connect_import
2509 * ............... class_manual_cleanup
2512 * ptlrpc_connect_interrupt
2514 * add this client to shrink list
2516 * Bang! pinger trigger the shrink.
2517 * So the osc should be disconnected from the shrink list, after we
2518 * are sure the import has been destroyed. BUG18662
2520 if (obd->u.cli.cl_import == NULL)
2521 osc_del_shrink_grant(&obd->u.cli);
2525 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2526 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2528 struct lu_env *env = arg;
2529 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2530 struct ldlm_lock *lock;
2531 struct osc_object *osc = NULL;
2535 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2536 if (lock->l_ast_data != NULL && osc == NULL) {
2537 osc = lock->l_ast_data;
2538 cl_object_get(osc2cl(osc));
2541 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2542 * by the 2nd round of ldlm_namespace_clean() call in
2543 * osc_import_event(). */
2544 ldlm_clear_cleaned(lock);
2549 osc_object_invalidate(env, osc);
2550 cl_object_put(env, osc2cl(osc));
2556 static int osc_import_event(struct obd_device *obd,
2557 struct obd_import *imp,
2558 enum obd_import_event event)
2560 struct client_obd *cli;
2564 LASSERT(imp->imp_obd == obd);
2567 case IMP_EVENT_DISCON: {
2569 spin_lock(&cli->cl_loi_list_lock);
2570 cli->cl_avail_grant = 0;
2571 cli->cl_lost_grant = 0;
2572 spin_unlock(&cli->cl_loi_list_lock);
2575 case IMP_EVENT_INACTIVE: {
2576 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2579 case IMP_EVENT_INVALIDATE: {
2580 struct ldlm_namespace *ns = obd->obd_namespace;
2584 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2586 env = cl_env_get(&refcheck);
2588 osc_io_unplug(env, &obd->u.cli, NULL);
2590 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2591 osc_ldlm_resource_invalidate,
2593 cl_env_put(env, &refcheck);
2595 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2600 case IMP_EVENT_ACTIVE: {
2601 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2604 case IMP_EVENT_OCD: {
2605 struct obd_connect_data *ocd = &imp->imp_connect_data;
2607 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2608 osc_init_grant(&obd->u.cli, ocd);
2611 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2612 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2614 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2617 case IMP_EVENT_DEACTIVATE: {
2618 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2621 case IMP_EVENT_ACTIVATE: {
2622 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2626 CERROR("Unknown import event %d\n", event);
2633 * Determine whether the lock can be canceled before replaying the lock
2634 * during recovery, see bug16774 for detailed information.
2636 * \retval zero the lock can't be canceled
2637 * \retval other ok to cancel
2639 static int osc_cancel_weight(struct ldlm_lock *lock)
2642 * Cancel all unused and granted extent lock.
2644 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2645 lock->l_granted_mode == lock->l_req_mode &&
2646 osc_ldlm_weigh_ast(lock) == 0)
2652 static int brw_queue_work(const struct lu_env *env, void *data)
2654 struct client_obd *cli = data;
2656 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2658 osc_io_unplug(env, cli, NULL);
2662 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2664 struct client_obd *cli = &obd->u.cli;
2665 struct obd_type *type;
2673 rc = ptlrpcd_addref();
2677 rc = client_obd_setup(obd, lcfg);
2679 GOTO(out_ptlrpcd, rc);
2681 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2682 if (IS_ERR(handler))
2683 GOTO(out_client_setup, rc = PTR_ERR(handler));
2684 cli->cl_writeback_work = handler;
2686 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2687 if (IS_ERR(handler))
2688 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2689 cli->cl_lru_work = handler;
2691 rc = osc_quota_setup(obd);
2693 GOTO(out_ptlrpcd_work, rc);
2695 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2697 #ifdef CONFIG_PROC_FS
2698 obd->obd_vars = lprocfs_osc_obd_vars;
2700 /* If this is true then both client (osc) and server (osp) are on the
2701 * same node. The osp layer if loaded first will register the osc proc
2702 * directory. In that case this obd_device will be attached its proc
2703 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2704 type = class_search_type(LUSTRE_OSP_NAME);
2705 if (type && type->typ_procsym) {
2706 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2708 obd->obd_vars, obd);
2709 if (IS_ERR(obd->obd_proc_entry)) {
2710 rc = PTR_ERR(obd->obd_proc_entry);
2711 CERROR("error %d setting up lprocfs for %s\n", rc,
2713 obd->obd_proc_entry = NULL;
2716 rc = lprocfs_obd_setup(obd);
2719 /* If the basic OSC proc tree construction succeeded then
2720 * lets do the rest. */
2722 lproc_osc_attach_seqstat(obd);
2723 sptlrpc_lprocfs_cliobd_attach(obd);
2724 ptlrpc_lprocfs_register_obd(obd);
2728 * We try to control the total number of requests with a upper limit
2729 * osc_reqpool_maxreqcount. There might be some race which will cause
2730 * over-limit allocation, but it is fine.
2732 req_count = atomic_read(&osc_pool_req_count);
2733 if (req_count < osc_reqpool_maxreqcount) {
2734 adding = cli->cl_max_rpcs_in_flight + 2;
2735 if (req_count + adding > osc_reqpool_maxreqcount)
2736 adding = osc_reqpool_maxreqcount - req_count;
2738 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2739 atomic_add(added, &osc_pool_req_count);
2742 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2743 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2745 spin_lock(&osc_shrink_lock);
2746 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2747 spin_unlock(&osc_shrink_lock);
2752 if (cli->cl_writeback_work != NULL) {
2753 ptlrpcd_destroy_work(cli->cl_writeback_work);
2754 cli->cl_writeback_work = NULL;
2756 if (cli->cl_lru_work != NULL) {
2757 ptlrpcd_destroy_work(cli->cl_lru_work);
2758 cli->cl_lru_work = NULL;
2761 client_obd_cleanup(obd);
2767 static int osc_precleanup(struct obd_device *obd)
2769 struct client_obd *cli = &obd->u.cli;
2773 * for echo client, export may be on zombie list, wait for
2774 * zombie thread to cull it, because cli.cl_import will be
2775 * cleared in client_disconnect_export():
2776 * class_export_destroy() -> obd_cleanup() ->
2777 * echo_device_free() -> echo_client_cleanup() ->
2778 * obd_disconnect() -> osc_disconnect() ->
2779 * client_disconnect_export()
2781 obd_zombie_barrier();
2782 if (cli->cl_writeback_work) {
2783 ptlrpcd_destroy_work(cli->cl_writeback_work);
2784 cli->cl_writeback_work = NULL;
2787 if (cli->cl_lru_work) {
2788 ptlrpcd_destroy_work(cli->cl_lru_work);
2789 cli->cl_lru_work = NULL;
2792 obd_cleanup_client_import(obd);
2793 ptlrpc_lprocfs_unregister_obd(obd);
2794 lprocfs_obd_cleanup(obd);
2798 int osc_cleanup(struct obd_device *obd)
2800 struct client_obd *cli = &obd->u.cli;
2805 spin_lock(&osc_shrink_lock);
2806 list_del(&cli->cl_shrink_list);
2807 spin_unlock(&osc_shrink_lock);
2810 if (cli->cl_cache != NULL) {
2811 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2812 spin_lock(&cli->cl_cache->ccc_lru_lock);
2813 list_del_init(&cli->cl_lru_osc);
2814 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2815 cli->cl_lru_left = NULL;
2816 cl_cache_decref(cli->cl_cache);
2817 cli->cl_cache = NULL;
2820 /* free memory of osc quota cache */
2821 osc_quota_cleanup(obd);
2823 rc = client_obd_cleanup(obd);
2829 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2831 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2832 return rc > 0 ? 0: rc;
2835 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2837 return osc_process_config_base(obd, buf);
2840 static struct obd_ops osc_obd_ops = {
2841 .o_owner = THIS_MODULE,
2842 .o_setup = osc_setup,
2843 .o_precleanup = osc_precleanup,
2844 .o_cleanup = osc_cleanup,
2845 .o_add_conn = client_import_add_conn,
2846 .o_del_conn = client_import_del_conn,
2847 .o_connect = client_connect_import,
2848 .o_reconnect = osc_reconnect,
2849 .o_disconnect = osc_disconnect,
2850 .o_statfs = osc_statfs,
2851 .o_statfs_async = osc_statfs_async,
2852 .o_create = osc_create,
2853 .o_destroy = osc_destroy,
2854 .o_getattr = osc_getattr,
2855 .o_setattr = osc_setattr,
2856 .o_iocontrol = osc_iocontrol,
2857 .o_set_info_async = osc_set_info_async,
2858 .o_import_event = osc_import_event,
2859 .o_process_config = osc_process_config,
2860 .o_quotactl = osc_quotactl,
2863 static struct shrinker *osc_cache_shrinker;
2864 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2865 DEFINE_SPINLOCK(osc_shrink_lock);
2867 #ifndef HAVE_SHRINKER_COUNT
2868 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2870 struct shrink_control scv = {
2871 .nr_to_scan = shrink_param(sc, nr_to_scan),
2872 .gfp_mask = shrink_param(sc, gfp_mask)
2874 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2875 struct shrinker *shrinker = NULL;
2878 (void)osc_cache_shrink_scan(shrinker, &scv);
2880 return osc_cache_shrink_count(shrinker, &scv);
2884 static int __init osc_init(void)
2886 bool enable_proc = true;
2887 struct obd_type *type;
2888 unsigned int reqpool_size;
2889 unsigned int reqsize;
2891 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2892 osc_cache_shrink_count, osc_cache_shrink_scan);
2895 /* print an address of _any_ initialized kernel symbol from this
2896 * module, to allow debugging with gdb that doesn't support data
2897 * symbols from modules.*/
2898 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2900 rc = lu_kmem_init(osc_caches);
2904 type = class_search_type(LUSTRE_OSP_NAME);
2905 if (type != NULL && type->typ_procsym != NULL)
2906 enable_proc = false;
2908 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2909 LUSTRE_OSC_NAME, &osc_device_type);
2913 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2915 /* This is obviously too much memory, only prevent overflow here */
2916 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2917 GOTO(out_type, rc = -EINVAL);
2919 reqpool_size = osc_reqpool_mem_max << 20;
2922 while (reqsize < OST_IO_MAXREQSIZE)
2923 reqsize = reqsize << 1;
2926 * We don't enlarge the request count in OSC pool according to
2927 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2928 * tried after normal allocation failed. So a small OSC pool won't
2929 * cause much performance degression in most of cases.
2931 osc_reqpool_maxreqcount = reqpool_size / reqsize;
2933 atomic_set(&osc_pool_req_count, 0);
2934 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2935 ptlrpc_add_rqs_to_pool);
2937 if (osc_rq_pool != NULL)
2941 class_unregister_type(LUSTRE_OSC_NAME);
2943 lu_kmem_fini(osc_caches);
2948 static void __exit osc_exit(void)
2950 remove_shrinker(osc_cache_shrinker);
2951 class_unregister_type(LUSTRE_OSC_NAME);
2952 lu_kmem_fini(osc_caches);
2953 ptlrpc_free_rq_pool(osc_rq_pool);
2956 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2957 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2958 MODULE_VERSION(LUSTRE_VERSION_STRING);
2959 MODULE_LICENSE("GPL");
2961 module_init(osc_init);
2962 module_exit(osc_exit);