4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_enqueue_args {
95 struct obd_export *oa_exp;
96 enum ldlm_type oa_type;
97 enum ldlm_mode oa_mode;
99 osc_enqueue_upcall_f oa_upcall;
101 struct ost_lvb *oa_lvb;
102 struct lustre_handle oa_lockh;
103 unsigned int oa_agl:1;
106 static void osc_release_ppga(struct brw_page **ppga, size_t count);
107 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
110 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
112 struct ost_body *body;
114 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
117 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
120 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
123 struct ptlrpc_request *req;
124 struct ost_body *body;
128 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
132 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
134 ptlrpc_request_free(req);
138 osc_pack_req_body(req, oa);
140 ptlrpc_request_set_replen(req);
142 rc = ptlrpc_queue_wait(req);
146 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
148 GOTO(out, rc = -EPROTO);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
153 oa->o_blksize = cli_brw_size(exp->exp_obd);
154 oa->o_valid |= OBD_MD_FLBLKSZ;
158 ptlrpc_req_finished(req);
163 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
166 struct ptlrpc_request *req;
167 struct ost_body *body;
171 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
173 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
177 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
179 ptlrpc_request_free(req);
183 osc_pack_req_body(req, oa);
185 ptlrpc_request_set_replen(req);
187 rc = ptlrpc_queue_wait(req);
191 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193 GOTO(out, rc = -EPROTO);
195 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
199 ptlrpc_req_finished(req);
204 static int osc_setattr_interpret(const struct lu_env *env,
205 struct ptlrpc_request *req,
206 struct osc_setattr_args *sa, int rc)
208 struct ost_body *body;
214 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216 GOTO(out, rc = -EPROTO);
218 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
221 rc = sa->sa_upcall(sa->sa_cookie, rc);
225 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
226 obd_enqueue_update_f upcall, void *cookie,
227 struct ptlrpc_request_set *rqset)
229 struct ptlrpc_request *req;
230 struct osc_setattr_args *sa;
235 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
239 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
241 ptlrpc_request_free(req);
245 osc_pack_req_body(req, oa);
247 ptlrpc_request_set_replen(req);
249 /* do mds to ost setattr asynchronously */
251 /* Do not wait for response. */
252 ptlrpcd_add_req(req);
254 req->rq_interpret_reply =
255 (ptlrpc_interpterer_t)osc_setattr_interpret;
257 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
258 sa = ptlrpc_req_async_args(req);
260 sa->sa_upcall = upcall;
261 sa->sa_cookie = cookie;
263 if (rqset == PTLRPCD_SET)
264 ptlrpcd_add_req(req);
266 ptlrpc_set_add_req(rqset, req);
272 static int osc_create(const struct lu_env *env, struct obd_export *exp,
275 struct ptlrpc_request *req;
276 struct ost_body *body;
281 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
282 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
284 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
286 GOTO(out, rc = -ENOMEM);
288 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
290 ptlrpc_request_free(req);
294 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
297 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
299 ptlrpc_request_set_replen(req);
301 rc = ptlrpc_queue_wait(req);
305 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
307 GOTO(out_req, rc = -EPROTO);
309 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
310 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
312 oa->o_blksize = cli_brw_size(exp->exp_obd);
313 oa->o_valid |= OBD_MD_FLBLKSZ;
315 CDEBUG(D_HA, "transno: "LPD64"\n",
316 lustre_msg_get_transno(req->rq_repmsg));
318 ptlrpc_req_finished(req);
323 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
324 obd_enqueue_update_f upcall, void *cookie,
325 struct ptlrpc_request_set *rqset)
327 struct ptlrpc_request *req;
328 struct osc_setattr_args *sa;
329 struct ost_body *body;
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
337 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
339 ptlrpc_request_free(req);
342 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
343 ptlrpc_at_set_req_timeout(req);
345 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
347 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
349 ptlrpc_request_set_replen(req);
351 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
352 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
353 sa = ptlrpc_req_async_args(req);
355 sa->sa_upcall = upcall;
356 sa->sa_cookie = cookie;
357 if (rqset == PTLRPCD_SET)
358 ptlrpcd_add_req(req);
360 ptlrpc_set_add_req(rqset, req);
365 static int osc_sync_interpret(const struct lu_env *env,
366 struct ptlrpc_request *req,
369 struct osc_fsync_args *fa = arg;
370 struct ost_body *body;
371 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
372 unsigned long valid = 0;
373 struct cl_object *obj;
379 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
381 CERROR("can't unpack ost_body\n");
382 GOTO(out, rc = -EPROTO);
385 *fa->fa_oa = body->oa;
386 obj = osc2cl(fa->fa_obj);
388 /* Update osc object's blocks attribute */
389 cl_object_attr_lock(obj);
390 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
391 attr->cat_blocks = body->oa.o_blocks;
396 cl_object_attr_update(env, obj, attr, valid);
397 cl_object_attr_unlock(obj);
400 rc = fa->fa_upcall(fa->fa_cookie, rc);
404 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
405 obd_enqueue_update_f upcall, void *cookie,
406 struct ptlrpc_request_set *rqset)
408 struct obd_export *exp = osc_export(obj);
409 struct ptlrpc_request *req;
410 struct ost_body *body;
411 struct osc_fsync_args *fa;
415 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
419 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
421 ptlrpc_request_free(req);
425 /* overload the size and blocks fields in the oa with start/end */
426 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
428 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
430 ptlrpc_request_set_replen(req);
431 req->rq_interpret_reply = osc_sync_interpret;
433 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
434 fa = ptlrpc_req_async_args(req);
437 fa->fa_upcall = upcall;
438 fa->fa_cookie = cookie;
440 if (rqset == PTLRPCD_SET)
441 ptlrpcd_add_req(req);
443 ptlrpc_set_add_req(rqset, req);
448 /* Find and cancel locally locks matched by @mode in the resource found by
449 * @objid. Found locks are added into @cancel list. Returns the amount of
450 * locks added to @cancels list. */
451 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
452 struct list_head *cancels,
453 enum ldlm_mode mode, __u64 lock_flags)
455 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
456 struct ldlm_res_id res_id;
457 struct ldlm_resource *res;
461 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
462 * export) but disabled through procfs (flag in NS).
464 * This distinguishes from a case when ELC is not supported originally,
465 * when we still want to cancel locks in advance and just cancel them
466 * locally, without sending any RPC. */
467 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
470 ostid_build_res_name(&oa->o_oi, &res_id);
471 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
475 LDLM_RESOURCE_ADDREF(res);
476 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
477 lock_flags, 0, NULL);
478 LDLM_RESOURCE_DELREF(res);
479 ldlm_resource_putref(res);
483 static int osc_destroy_interpret(const struct lu_env *env,
484 struct ptlrpc_request *req, void *data,
487 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
489 atomic_dec(&cli->cl_destroy_in_flight);
490 wake_up(&cli->cl_destroy_waitq);
494 static int osc_can_send_destroy(struct client_obd *cli)
496 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
497 cli->cl_max_rpcs_in_flight) {
498 /* The destroy request can be sent */
501 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
502 cli->cl_max_rpcs_in_flight) {
504 * The counter has been modified between the two atomic
507 wake_up(&cli->cl_destroy_waitq);
512 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
515 struct client_obd *cli = &exp->exp_obd->u.cli;
516 struct ptlrpc_request *req;
517 struct ost_body *body;
518 struct list_head cancels = LIST_HEAD_INIT(cancels);
523 CDEBUG(D_INFO, "oa NULL\n");
527 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
528 LDLM_FL_DISCARD_DATA);
530 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
532 ldlm_lock_list_put(&cancels, l_bl_ast, count);
536 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
539 ptlrpc_request_free(req);
543 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
544 ptlrpc_at_set_req_timeout(req);
546 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
550 ptlrpc_request_set_replen(req);
552 req->rq_interpret_reply = osc_destroy_interpret;
553 if (!osc_can_send_destroy(cli)) {
554 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
557 * Wait until the number of on-going destroy RPCs drops
558 * under max_rpc_in_flight
560 l_wait_event_exclusive(cli->cl_destroy_waitq,
561 osc_can_send_destroy(cli), &lwi);
564 /* Do not wait for response */
565 ptlrpcd_add_req(req);
569 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
572 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
574 LASSERT(!(oa->o_valid & bits));
577 spin_lock(&cli->cl_loi_list_lock);
578 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
579 oa->o_dirty = cli->cl_dirty_grant;
581 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
582 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
583 cli->cl_dirty_max_pages)) {
584 CERROR("dirty %lu - %lu > dirty_max %lu\n",
585 cli->cl_dirty_pages, cli->cl_dirty_transit,
586 cli->cl_dirty_max_pages);
588 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
589 atomic_long_read(&obd_dirty_transit_pages) >
590 (long)(obd_max_dirty_pages + 1))) {
591 /* The atomic_read() allowing the atomic_inc() are
592 * not covered by a lock thus they may safely race and trip
593 * this CERROR() unless we add in a small fudge factor (+1). */
594 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
595 cli_name(cli), atomic_long_read(&obd_dirty_pages),
596 atomic_long_read(&obd_dirty_transit_pages),
597 obd_max_dirty_pages);
599 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
601 CERROR("dirty %lu - dirty_max %lu too big???\n",
602 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
605 unsigned long nrpages;
607 nrpages = cli->cl_max_pages_per_rpc;
608 nrpages *= cli->cl_max_rpcs_in_flight + 1;
609 nrpages = max(nrpages, cli->cl_dirty_max_pages);
610 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
611 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
615 /* take extent tax into account when asking for more
617 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
618 cli->cl_max_extent_pages;
619 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
622 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
623 oa->o_dropped = cli->cl_lost_grant;
624 cli->cl_lost_grant = 0;
625 spin_unlock(&cli->cl_loi_list_lock);
626 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
627 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
630 void osc_update_next_shrink(struct client_obd *cli)
632 cli->cl_next_shrink_grant =
633 cfs_time_shift(cli->cl_grant_shrink_interval);
634 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
635 cli->cl_next_shrink_grant);
638 static void __osc_update_grant(struct client_obd *cli, u64 grant)
640 spin_lock(&cli->cl_loi_list_lock);
641 cli->cl_avail_grant += grant;
642 spin_unlock(&cli->cl_loi_list_lock);
645 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
647 if (body->oa.o_valid & OBD_MD_FLGRANT) {
648 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
649 __osc_update_grant(cli, body->oa.o_grant);
653 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
654 u32 keylen, void *key,
655 u32 vallen, void *val,
656 struct ptlrpc_request_set *set);
658 static int osc_shrink_grant_interpret(const struct lu_env *env,
659 struct ptlrpc_request *req,
662 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
664 struct ost_body *body;
667 __osc_update_grant(cli, oa->o_grant);
671 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
673 osc_update_grant(cli, body);
679 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
681 spin_lock(&cli->cl_loi_list_lock);
682 oa->o_grant = cli->cl_avail_grant / 4;
683 cli->cl_avail_grant -= oa->o_grant;
684 spin_unlock(&cli->cl_loi_list_lock);
685 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
686 oa->o_valid |= OBD_MD_FLFLAGS;
689 oa->o_flags |= OBD_FL_SHRINK_GRANT;
690 osc_update_next_shrink(cli);
693 /* Shrink the current grant, either from some large amount to enough for a
694 * full set of in-flight RPCs, or if we have already shrunk to that limit
695 * then to enough for a single RPC. This avoids keeping more grant than
696 * needed, and avoids shrinking the grant piecemeal. */
697 static int osc_shrink_grant(struct client_obd *cli)
699 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
700 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
702 spin_lock(&cli->cl_loi_list_lock);
703 if (cli->cl_avail_grant <= target_bytes)
704 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
705 spin_unlock(&cli->cl_loi_list_lock);
707 return osc_shrink_grant_to_target(cli, target_bytes);
710 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
713 struct ost_body *body;
716 spin_lock(&cli->cl_loi_list_lock);
717 /* Don't shrink if we are already above or below the desired limit
718 * We don't want to shrink below a single RPC, as that will negatively
719 * impact block allocation and long-term performance. */
720 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
721 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
723 if (target_bytes >= cli->cl_avail_grant) {
724 spin_unlock(&cli->cl_loi_list_lock);
727 spin_unlock(&cli->cl_loi_list_lock);
733 osc_announce_cached(cli, &body->oa, 0);
735 spin_lock(&cli->cl_loi_list_lock);
736 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
737 cli->cl_avail_grant = target_bytes;
738 spin_unlock(&cli->cl_loi_list_lock);
739 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
740 body->oa.o_valid |= OBD_MD_FLFLAGS;
741 body->oa.o_flags = 0;
743 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
744 osc_update_next_shrink(cli);
746 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
747 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
748 sizeof(*body), body, NULL);
750 __osc_update_grant(cli, body->oa.o_grant);
755 static int osc_should_shrink_grant(struct client_obd *client)
757 cfs_time_t time = cfs_time_current();
758 cfs_time_t next_shrink = client->cl_next_shrink_grant;
760 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
761 OBD_CONNECT_GRANT_SHRINK) == 0)
764 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
765 /* Get the current RPC size directly, instead of going via:
766 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
767 * Keep comment here so that it can be found by searching. */
768 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
770 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
771 client->cl_avail_grant > brw_size)
774 osc_update_next_shrink(client);
779 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
781 struct client_obd *client;
783 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
784 if (osc_should_shrink_grant(client))
785 osc_shrink_grant(client);
790 static int osc_add_shrink_grant(struct client_obd *client)
794 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
796 osc_grant_shrink_grant_cb, NULL,
797 &client->cl_grant_shrink_list);
799 CERROR("add grant client %s error %d\n", cli_name(client), rc);
802 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
803 osc_update_next_shrink(client);
807 static int osc_del_shrink_grant(struct client_obd *client)
809 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
813 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
816 * ocd_grant is the total grant amount we're expect to hold: if we've
817 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
818 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
821 * race is tolerable here: if we're evicted, but imp_state already
822 * left EVICTED state, then cl_dirty_pages must be 0 already.
824 spin_lock(&cli->cl_loi_list_lock);
825 cli->cl_avail_grant = ocd->ocd_grant;
826 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
827 cli->cl_avail_grant -= cli->cl_reserved_grant;
828 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
829 cli->cl_avail_grant -= cli->cl_dirty_grant;
831 cli->cl_avail_grant -=
832 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
835 if (cli->cl_avail_grant < 0) {
836 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
837 cli_name(cli), cli->cl_avail_grant,
838 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
839 /* workaround for servers which do not have the patch from
841 cli->cl_avail_grant = ocd->ocd_grant;
844 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
847 /* overhead for each extent insertion */
848 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
849 /* determine the appropriate chunk size used by osc_extent. */
850 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
851 ocd->ocd_grant_blkbits);
852 /* determine maximum extent size, in #pages */
853 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
854 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
855 if (cli->cl_max_extent_pages == 0)
856 cli->cl_max_extent_pages = 1;
858 cli->cl_grant_extent_tax = 0;
859 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
860 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
862 spin_unlock(&cli->cl_loi_list_lock);
864 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
865 "chunk bits: %d cl_max_extent_pages: %d\n",
867 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
868 cli->cl_max_extent_pages);
870 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
871 list_empty(&cli->cl_grant_shrink_list))
872 osc_add_shrink_grant(cli);
875 /* We assume that the reason this OSC got a short read is because it read
876 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
877 * via the LOV, and it _knows_ it's reading inside the file, it's just that
878 * this stripe never got written at or beyond this stripe offset yet. */
879 static void handle_short_read(int nob_read, size_t page_count,
880 struct brw_page **pga)
885 /* skip bytes read OK */
886 while (nob_read > 0) {
887 LASSERT (page_count > 0);
889 if (pga[i]->count > nob_read) {
890 /* EOF inside this page */
891 ptr = kmap(pga[i]->pg) +
892 (pga[i]->off & ~PAGE_MASK);
893 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
900 nob_read -= pga[i]->count;
905 /* zero remaining pages */
906 while (page_count-- > 0) {
907 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
908 memset(ptr, 0, pga[i]->count);
914 static int check_write_rcs(struct ptlrpc_request *req,
915 int requested_nob, int niocount,
916 size_t page_count, struct brw_page **pga)
921 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
922 sizeof(*remote_rcs) *
924 if (remote_rcs == NULL) {
925 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
929 /* return error if any niobuf was in error */
930 for (i = 0; i < niocount; i++) {
931 if ((int)remote_rcs[i] < 0)
932 return(remote_rcs[i]);
934 if (remote_rcs[i] != 0) {
935 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
936 i, remote_rcs[i], req);
941 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
942 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
943 req->rq_bulk->bd_nob_transferred, requested_nob);
950 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
952 if (p1->flag != p2->flag) {
953 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
954 OBD_BRW_SYNC | OBD_BRW_ASYNC |
955 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
957 /* warn if we try to combine flags that we don't know to be
959 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
960 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
961 "report this at https://jira.hpdd.intel.com/\n",
967 return (p1->off + p1->count == p2->off);
970 static u32 osc_checksum_bulk(int nob, size_t pg_count,
971 struct brw_page **pga, int opc,
972 cksum_type_t cksum_type)
976 struct cfs_crypto_hash_desc *hdesc;
977 unsigned int bufsize;
979 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
981 LASSERT(pg_count > 0);
983 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
985 CERROR("Unable to initialize checksum hash %s\n",
986 cfs_crypto_hash_name(cfs_alg));
987 return PTR_ERR(hdesc);
990 while (nob > 0 && pg_count > 0) {
991 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
993 /* corrupt the data before we compute the checksum, to
994 * simulate an OST->client data error */
995 if (i == 0 && opc == OST_READ &&
996 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
997 unsigned char *ptr = kmap(pga[i]->pg);
998 int off = pga[i]->off & ~PAGE_MASK;
1000 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1003 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1004 pga[i]->off & ~PAGE_MASK,
1006 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1007 (int)(pga[i]->off & ~PAGE_MASK));
1009 nob -= pga[i]->count;
1014 bufsize = sizeof(cksum);
1015 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1017 /* For sending we only compute the wrong checksum instead
1018 * of corrupting the data so it is still correct on a redo */
1019 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1026 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1027 u32 page_count, struct brw_page **pga,
1028 struct ptlrpc_request **reqp, int resend)
1030 struct ptlrpc_request *req;
1031 struct ptlrpc_bulk_desc *desc;
1032 struct ost_body *body;
1033 struct obd_ioobj *ioobj;
1034 struct niobuf_remote *niobuf;
1035 int niocount, i, requested_nob, opc, rc;
1036 struct osc_brw_async_args *aa;
1037 struct req_capsule *pill;
1038 struct brw_page *pg_prev;
1041 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1042 RETURN(-ENOMEM); /* Recoverable */
1043 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1044 RETURN(-EINVAL); /* Fatal */
1046 if ((cmd & OBD_BRW_WRITE) != 0) {
1048 req = ptlrpc_request_alloc_pool(cli->cl_import,
1050 &RQF_OST_BRW_WRITE);
1053 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1058 for (niocount = i = 1; i < page_count; i++) {
1059 if (!can_merge_pages(pga[i - 1], pga[i]))
1063 pill = &req->rq_pill;
1064 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1066 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1067 niocount * sizeof(*niobuf));
1069 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1071 ptlrpc_request_free(req);
1074 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1075 ptlrpc_at_set_req_timeout(req);
1076 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1078 req->rq_no_retry_einprogress = 1;
1080 desc = ptlrpc_prep_bulk_imp(req, page_count,
1081 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1082 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1083 PTLRPC_BULK_PUT_SINK) |
1084 PTLRPC_BULK_BUF_KIOV,
1086 &ptlrpc_bulk_kiov_pin_ops);
1089 GOTO(out, rc = -ENOMEM);
1090 /* NB request now owns desc and will free it when it gets freed */
1092 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1093 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1094 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1095 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1097 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1099 obdo_to_ioobj(oa, ioobj);
1100 ioobj->ioo_bufcnt = niocount;
1101 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1102 * that might be send for this request. The actual number is decided
1103 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1104 * "max - 1" for old client compatibility sending "0", and also so the
1105 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1106 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1107 LASSERT(page_count > 0);
1109 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1110 struct brw_page *pg = pga[i];
1111 int poff = pg->off & ~PAGE_MASK;
1113 LASSERT(pg->count > 0);
1114 /* make sure there is no gap in the middle of page array */
1115 LASSERTF(page_count == 1 ||
1116 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1117 ergo(i > 0 && i < page_count - 1,
1118 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1119 ergo(i == page_count - 1, poff == 0)),
1120 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1121 i, page_count, pg, pg->off, pg->count);
1122 LASSERTF(i == 0 || pg->off > pg_prev->off,
1123 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1124 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1126 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1127 pg_prev->pg, page_private(pg_prev->pg),
1128 pg_prev->pg->index, pg_prev->off);
1129 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1130 (pg->flag & OBD_BRW_SRVLOCK));
1132 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1133 requested_nob += pg->count;
1135 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1137 niobuf->rnb_len += pg->count;
1139 niobuf->rnb_offset = pg->off;
1140 niobuf->rnb_len = pg->count;
1141 niobuf->rnb_flags = pg->flag;
1146 LASSERTF((void *)(niobuf - niocount) ==
1147 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1148 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1149 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1151 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1153 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1154 body->oa.o_valid |= OBD_MD_FLFLAGS;
1155 body->oa.o_flags = 0;
1157 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1160 if (osc_should_shrink_grant(cli))
1161 osc_shrink_grant_local(cli, &body->oa);
1163 /* size[REQ_REC_OFF] still sizeof (*body) */
1164 if (opc == OST_WRITE) {
1165 if (cli->cl_checksum &&
1166 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1167 /* store cl_cksum_type in a local variable since
1168 * it can be changed via lprocfs */
1169 cksum_type_t cksum_type = cli->cl_cksum_type;
1171 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1172 oa->o_flags &= OBD_FL_LOCAL_MASK;
1173 body->oa.o_flags = 0;
1175 body->oa.o_flags |= cksum_type_pack(cksum_type);
1176 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1177 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1181 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1183 /* save this in 'oa', too, for later checking */
1184 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1185 oa->o_flags |= cksum_type_pack(cksum_type);
1187 /* clear out the checksum flag, in case this is a
1188 * resend but cl_checksum is no longer set. b=11238 */
1189 oa->o_valid &= ~OBD_MD_FLCKSUM;
1191 oa->o_cksum = body->oa.o_cksum;
1192 /* 1 RC per niobuf */
1193 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1194 sizeof(__u32) * niocount);
1196 if (cli->cl_checksum &&
1197 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1198 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1199 body->oa.o_flags = 0;
1200 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1201 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1204 ptlrpc_request_set_replen(req);
1206 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1207 aa = ptlrpc_req_async_args(req);
1209 aa->aa_requested_nob = requested_nob;
1210 aa->aa_nio_count = niocount;
1211 aa->aa_page_count = page_count;
1215 INIT_LIST_HEAD(&aa->aa_oaps);
1218 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1219 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1220 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1221 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1225 ptlrpc_req_finished(req);
1229 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1230 __u32 client_cksum, __u32 server_cksum, int nob,
1231 size_t page_count, struct brw_page **pga,
1232 cksum_type_t client_cksum_type)
1236 cksum_type_t cksum_type;
1238 if (server_cksum == client_cksum) {
1239 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1243 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1245 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1248 if (cksum_type != client_cksum_type)
1249 msg = "the server did not use the checksum type specified in "
1250 "the original request - likely a protocol problem";
1251 else if (new_cksum == server_cksum)
1252 msg = "changed on the client after we checksummed it - "
1253 "likely false positive due to mmap IO (bug 11742)";
1254 else if (new_cksum == client_cksum)
1255 msg = "changed in transit before arrival at OST";
1257 msg = "changed in transit AND doesn't match the original - "
1258 "likely false positive due to mmap IO (bug 11742)";
1260 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1261 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1262 msg, libcfs_nid2str(peer->nid),
1263 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1264 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1265 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1266 POSTID(&oa->o_oi), pga[0]->off,
1267 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1268 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1269 "client csum now %x\n", client_cksum, client_cksum_type,
1270 server_cksum, cksum_type, new_cksum);
1274 /* Note rc enters this function as number of bytes transferred */
1275 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1277 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1278 const lnet_process_id_t *peer =
1279 &req->rq_import->imp_connection->c_peer;
1280 struct client_obd *cli = aa->aa_cli;
1281 struct ost_body *body;
1282 u32 client_cksum = 0;
1285 if (rc < 0 && rc != -EDQUOT) {
1286 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1290 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1291 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1293 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1297 /* set/clear over quota flag for a uid/gid */
1298 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1299 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1300 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1302 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1303 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1305 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1308 osc_update_grant(cli, body);
1313 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1314 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1316 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1318 CERROR("Unexpected +ve rc %d\n", rc);
1321 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1323 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1326 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1327 check_write_checksum(&body->oa, peer, client_cksum,
1328 body->oa.o_cksum, aa->aa_requested_nob,
1329 aa->aa_page_count, aa->aa_ppga,
1330 cksum_type_unpack(aa->aa_oa->o_flags)))
1333 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1334 aa->aa_page_count, aa->aa_ppga);
1338 /* The rest of this function executes only for OST_READs */
1340 /* if unwrap_bulk failed, return -EAGAIN to retry */
1341 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1343 GOTO(out, rc = -EAGAIN);
1345 if (rc > aa->aa_requested_nob) {
1346 CERROR("Unexpected rc %d (%d requested)\n", rc,
1347 aa->aa_requested_nob);
1351 if (rc != req->rq_bulk->bd_nob_transferred) {
1352 CERROR ("Unexpected rc %d (%d transferred)\n",
1353 rc, req->rq_bulk->bd_nob_transferred);
1357 if (rc < aa->aa_requested_nob)
1358 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1360 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1361 static int cksum_counter;
1362 u32 server_cksum = body->oa.o_cksum;
1365 cksum_type_t cksum_type;
1367 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1368 body->oa.o_flags : 0);
1369 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1370 aa->aa_ppga, OST_READ,
1373 if (peer->nid != req->rq_bulk->bd_sender) {
1375 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1378 if (server_cksum != client_cksum) {
1379 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1380 "%s%s%s inode "DFID" object "DOSTID
1381 " extent ["LPU64"-"LPU64"]\n",
1382 req->rq_import->imp_obd->obd_name,
1383 libcfs_nid2str(peer->nid),
1385 body->oa.o_valid & OBD_MD_FLFID ?
1386 body->oa.o_parent_seq : (__u64)0,
1387 body->oa.o_valid & OBD_MD_FLFID ?
1388 body->oa.o_parent_oid : 0,
1389 body->oa.o_valid & OBD_MD_FLFID ?
1390 body->oa.o_parent_ver : 0,
1391 POSTID(&body->oa.o_oi),
1392 aa->aa_ppga[0]->off,
1393 aa->aa_ppga[aa->aa_page_count-1]->off +
1394 aa->aa_ppga[aa->aa_page_count-1]->count -
1396 CERROR("client %x, server %x, cksum_type %x\n",
1397 client_cksum, server_cksum, cksum_type);
1399 aa->aa_oa->o_cksum = client_cksum;
1403 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1406 } else if (unlikely(client_cksum)) {
1407 static int cksum_missed;
1410 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1411 CERROR("Checksum %u requested from %s but not sent\n",
1412 cksum_missed, libcfs_nid2str(peer->nid));
1418 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1419 aa->aa_oa, &body->oa);
1424 static int osc_brw_redo_request(struct ptlrpc_request *request,
1425 struct osc_brw_async_args *aa, int rc)
1427 struct ptlrpc_request *new_req;
1428 struct osc_brw_async_args *new_aa;
1429 struct osc_async_page *oap;
1432 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1433 "redo for recoverable error %d", rc);
1435 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1436 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1437 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1438 aa->aa_ppga, &new_req, 1);
1442 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1443 if (oap->oap_request != NULL) {
1444 LASSERTF(request == oap->oap_request,
1445 "request %p != oap_request %p\n",
1446 request, oap->oap_request);
1447 if (oap->oap_interrupted) {
1448 ptlrpc_req_finished(new_req);
1453 /* New request takes over pga and oaps from old request.
1454 * Note that copying a list_head doesn't work, need to move it... */
1456 new_req->rq_interpret_reply = request->rq_interpret_reply;
1457 new_req->rq_async_args = request->rq_async_args;
1458 new_req->rq_commit_cb = request->rq_commit_cb;
1459 /* cap resend delay to the current request timeout, this is similar to
1460 * what ptlrpc does (see after_reply()) */
1461 if (aa->aa_resends > new_req->rq_timeout)
1462 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1464 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1465 new_req->rq_generation_set = 1;
1466 new_req->rq_import_generation = request->rq_import_generation;
1468 new_aa = ptlrpc_req_async_args(new_req);
1470 INIT_LIST_HEAD(&new_aa->aa_oaps);
1471 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1472 INIT_LIST_HEAD(&new_aa->aa_exts);
1473 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1474 new_aa->aa_resends = aa->aa_resends;
1476 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1477 if (oap->oap_request) {
1478 ptlrpc_req_finished(oap->oap_request);
1479 oap->oap_request = ptlrpc_request_addref(new_req);
1483 /* XXX: This code will run into problem if we're going to support
1484 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1485 * and wait for all of them to be finished. We should inherit request
1486 * set from old request. */
1487 ptlrpcd_add_req(new_req);
1489 DEBUG_REQ(D_INFO, new_req, "new request");
1494 * ugh, we want disk allocation on the target to happen in offset order. we'll
1495 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1496 * fine for our small page arrays and doesn't require allocation. its an
1497 * insertion sort that swaps elements that are strides apart, shrinking the
1498 * stride down until its '1' and the array is sorted.
1500 static void sort_brw_pages(struct brw_page **array, int num)
1503 struct brw_page *tmp;
1507 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1512 for (i = stride ; i < num ; i++) {
1515 while (j >= stride && array[j - stride]->off > tmp->off) {
1516 array[j] = array[j - stride];
1521 } while (stride > 1);
1524 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1526 LASSERT(ppga != NULL);
1527 OBD_FREE(ppga, sizeof(*ppga) * count);
1530 static int brw_interpret(const struct lu_env *env,
1531 struct ptlrpc_request *req, void *data, int rc)
1533 struct osc_brw_async_args *aa = data;
1534 struct osc_extent *ext;
1535 struct osc_extent *tmp;
1536 struct client_obd *cli = aa->aa_cli;
1539 rc = osc_brw_fini_request(req, rc);
1540 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1541 /* When server return -EINPROGRESS, client should always retry
1542 * regardless of the number of times the bulk was resent already. */
1543 if (osc_recoverable_error(rc)) {
1544 if (req->rq_import_generation !=
1545 req->rq_import->imp_generation) {
1546 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1547 ""DOSTID", rc = %d.\n",
1548 req->rq_import->imp_obd->obd_name,
1549 POSTID(&aa->aa_oa->o_oi), rc);
1550 } else if (rc == -EINPROGRESS ||
1551 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1552 rc = osc_brw_redo_request(req, aa, rc);
1554 CERROR("%s: too many resent retries for object: "
1555 ""LPU64":"LPU64", rc = %d.\n",
1556 req->rq_import->imp_obd->obd_name,
1557 POSTID(&aa->aa_oa->o_oi), rc);
1562 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1567 struct obdo *oa = aa->aa_oa;
1568 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1569 unsigned long valid = 0;
1570 struct cl_object *obj;
1571 struct osc_async_page *last;
1573 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1574 obj = osc2cl(last->oap_obj);
1576 cl_object_attr_lock(obj);
1577 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1578 attr->cat_blocks = oa->o_blocks;
1579 valid |= CAT_BLOCKS;
1581 if (oa->o_valid & OBD_MD_FLMTIME) {
1582 attr->cat_mtime = oa->o_mtime;
1585 if (oa->o_valid & OBD_MD_FLATIME) {
1586 attr->cat_atime = oa->o_atime;
1589 if (oa->o_valid & OBD_MD_FLCTIME) {
1590 attr->cat_ctime = oa->o_ctime;
1594 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1595 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1596 loff_t last_off = last->oap_count + last->oap_obj_off +
1599 /* Change file size if this is an out of quota or
1600 * direct IO write and it extends the file size */
1601 if (loi->loi_lvb.lvb_size < last_off) {
1602 attr->cat_size = last_off;
1605 /* Extend KMS if it's not a lockless write */
1606 if (loi->loi_kms < last_off &&
1607 oap2osc_page(last)->ops_srvlock == 0) {
1608 attr->cat_kms = last_off;
1614 cl_object_attr_update(env, obj, attr, valid);
1615 cl_object_attr_unlock(obj);
1617 OBDO_FREE(aa->aa_oa);
1619 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1620 osc_inc_unstable_pages(req);
1622 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1623 list_del_init(&ext->oe_link);
1624 osc_extent_finish(env, ext, 1, rc);
1626 LASSERT(list_empty(&aa->aa_exts));
1627 LASSERT(list_empty(&aa->aa_oaps));
1629 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1630 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1632 spin_lock(&cli->cl_loi_list_lock);
1633 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1634 * is called so we know whether to go to sync BRWs or wait for more
1635 * RPCs to complete */
1636 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1637 cli->cl_w_in_flight--;
1639 cli->cl_r_in_flight--;
1640 osc_wake_cache_waiters(cli);
1641 spin_unlock(&cli->cl_loi_list_lock);
1643 osc_io_unplug(env, cli, NULL);
1647 static void brw_commit(struct ptlrpc_request *req)
1649 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1650 * this called via the rq_commit_cb, I need to ensure
1651 * osc_dec_unstable_pages is still called. Otherwise unstable
1652 * pages may be leaked. */
1653 spin_lock(&req->rq_lock);
1654 if (likely(req->rq_unstable)) {
1655 req->rq_unstable = 0;
1656 spin_unlock(&req->rq_lock);
1658 osc_dec_unstable_pages(req);
1660 req->rq_committed = 1;
1661 spin_unlock(&req->rq_lock);
1666 * Build an RPC by the list of extent @ext_list. The caller must ensure
1667 * that the total pages in this list are NOT over max pages per RPC.
1668 * Extents in the list must be in OES_RPC state.
1670 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1671 struct list_head *ext_list, int cmd)
1673 struct ptlrpc_request *req = NULL;
1674 struct osc_extent *ext;
1675 struct brw_page **pga = NULL;
1676 struct osc_brw_async_args *aa = NULL;
1677 struct obdo *oa = NULL;
1678 struct osc_async_page *oap;
1679 struct osc_object *obj = NULL;
1680 struct cl_req_attr *crattr = NULL;
1681 loff_t starting_offset = OBD_OBJECT_EOF;
1682 loff_t ending_offset = 0;
1686 bool soft_sync = false;
1687 bool interrupted = false;
1691 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1692 struct ost_body *body;
1694 LASSERT(!list_empty(ext_list));
1696 /* add pages into rpc_list to build BRW rpc */
1697 list_for_each_entry(ext, ext_list, oe_link) {
1698 LASSERT(ext->oe_state == OES_RPC);
1699 mem_tight |= ext->oe_memalloc;
1700 grant += ext->oe_grants;
1701 page_count += ext->oe_nr_pages;
1706 soft_sync = osc_over_unstable_soft_limit(cli);
1708 mpflag = cfs_memory_pressure_get_and_set();
1710 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1712 GOTO(out, rc = -ENOMEM);
1716 GOTO(out, rc = -ENOMEM);
1719 list_for_each_entry(ext, ext_list, oe_link) {
1720 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1722 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1724 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1725 pga[i] = &oap->oap_brw_page;
1726 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1729 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1730 if (starting_offset == OBD_OBJECT_EOF ||
1731 starting_offset > oap->oap_obj_off)
1732 starting_offset = oap->oap_obj_off;
1734 LASSERT(oap->oap_page_off == 0);
1735 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1736 ending_offset = oap->oap_obj_off +
1739 LASSERT(oap->oap_page_off + oap->oap_count ==
1741 if (oap->oap_interrupted)
1746 /* first page in the list */
1747 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1749 crattr = &osc_env_info(env)->oti_req_attr;
1750 memset(crattr, 0, sizeof(*crattr));
1751 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1752 crattr->cra_flags = ~0ULL;
1753 crattr->cra_page = oap2cl_page(oap);
1754 crattr->cra_oa = oa;
1755 cl_req_attr_set(env, osc2cl(obj), crattr);
1757 if (cmd == OBD_BRW_WRITE)
1758 oa->o_grant_used = grant;
1760 sort_brw_pages(pga, page_count);
1761 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1763 CERROR("prep_req failed: %d\n", rc);
1767 req->rq_commit_cb = brw_commit;
1768 req->rq_interpret_reply = brw_interpret;
1769 req->rq_memalloc = mem_tight != 0;
1770 oap->oap_request = ptlrpc_request_addref(req);
1771 if (interrupted && !req->rq_intr)
1772 ptlrpc_mark_interrupted(req);
1774 /* Need to update the timestamps after the request is built in case
1775 * we race with setattr (locally or in queue at OST). If OST gets
1776 * later setattr before earlier BRW (as determined by the request xid),
1777 * the OST will not use BRW timestamps. Sadly, there is no obvious
1778 * way to do this in a single call. bug 10150 */
1779 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1780 crattr->cra_oa = &body->oa;
1781 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1782 cl_req_attr_set(env, osc2cl(obj), crattr);
1783 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1785 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1786 aa = ptlrpc_req_async_args(req);
1787 INIT_LIST_HEAD(&aa->aa_oaps);
1788 list_splice_init(&rpc_list, &aa->aa_oaps);
1789 INIT_LIST_HEAD(&aa->aa_exts);
1790 list_splice_init(ext_list, &aa->aa_exts);
1792 spin_lock(&cli->cl_loi_list_lock);
1793 starting_offset >>= PAGE_CACHE_SHIFT;
1794 if (cmd == OBD_BRW_READ) {
1795 cli->cl_r_in_flight++;
1796 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1797 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1798 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1799 starting_offset + 1);
1801 cli->cl_w_in_flight++;
1802 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1803 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1804 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1805 starting_offset + 1);
1807 spin_unlock(&cli->cl_loi_list_lock);
1809 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1810 page_count, aa, cli->cl_r_in_flight,
1811 cli->cl_w_in_flight);
1813 ptlrpcd_add_req(req);
1819 cfs_memory_pressure_restore(mpflag);
1822 LASSERT(req == NULL);
1827 OBD_FREE(pga, sizeof(*pga) * page_count);
1828 /* this should happen rarely and is pretty bad, it makes the
1829 * pending list not follow the dirty order */
1830 while (!list_empty(ext_list)) {
1831 ext = list_entry(ext_list->next, struct osc_extent,
1833 list_del_init(&ext->oe_link);
1834 osc_extent_finish(env, ext, 0, rc);
1840 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1841 struct ldlm_enqueue_info *einfo)
1843 void *data = einfo->ei_cbdata;
1846 LASSERT(lock != NULL);
1847 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1848 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1849 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1850 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1852 lock_res_and_lock(lock);
1854 if (lock->l_ast_data == NULL)
1855 lock->l_ast_data = data;
1856 if (lock->l_ast_data == data)
1859 unlock_res_and_lock(lock);
1864 static int osc_set_data_with_check(struct lustre_handle *lockh,
1865 struct ldlm_enqueue_info *einfo)
1867 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1871 set = osc_set_lock_data_with_check(lock, einfo);
1872 LDLM_LOCK_PUT(lock);
1874 CERROR("lockh %p, data %p - client evicted?\n",
1875 lockh, einfo->ei_cbdata);
1879 static int osc_enqueue_fini(struct ptlrpc_request *req,
1880 osc_enqueue_upcall_f upcall, void *cookie,
1881 struct lustre_handle *lockh, enum ldlm_mode mode,
1882 __u64 *flags, int agl, int errcode)
1884 bool intent = *flags & LDLM_FL_HAS_INTENT;
1888 /* The request was created before ldlm_cli_enqueue call. */
1889 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1890 struct ldlm_reply *rep;
1892 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1893 LASSERT(rep != NULL);
1895 rep->lock_policy_res1 =
1896 ptlrpc_status_ntoh(rep->lock_policy_res1);
1897 if (rep->lock_policy_res1)
1898 errcode = rep->lock_policy_res1;
1900 *flags |= LDLM_FL_LVB_READY;
1901 } else if (errcode == ELDLM_OK) {
1902 *flags |= LDLM_FL_LVB_READY;
1905 /* Call the update callback. */
1906 rc = (*upcall)(cookie, lockh, errcode);
1908 /* release the reference taken in ldlm_cli_enqueue() */
1909 if (errcode == ELDLM_LOCK_MATCHED)
1911 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1912 ldlm_lock_decref(lockh, mode);
1917 static int osc_enqueue_interpret(const struct lu_env *env,
1918 struct ptlrpc_request *req,
1919 struct osc_enqueue_args *aa, int rc)
1921 struct ldlm_lock *lock;
1922 struct lustre_handle *lockh = &aa->oa_lockh;
1923 enum ldlm_mode mode = aa->oa_mode;
1924 struct ost_lvb *lvb = aa->oa_lvb;
1925 __u32 lvb_len = sizeof(*lvb);
1930 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1932 lock = ldlm_handle2lock(lockh);
1933 LASSERTF(lock != NULL,
1934 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1935 lockh->cookie, req, aa);
1937 /* Take an additional reference so that a blocking AST that
1938 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1939 * to arrive after an upcall has been executed by
1940 * osc_enqueue_fini(). */
1941 ldlm_lock_addref(lockh, mode);
1943 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1944 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1946 /* Let CP AST to grant the lock first. */
1947 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1950 LASSERT(aa->oa_lvb == NULL);
1951 LASSERT(aa->oa_flags == NULL);
1952 aa->oa_flags = &flags;
1955 /* Complete obtaining the lock procedure. */
1956 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1957 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1959 /* Complete osc stuff. */
1960 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1961 aa->oa_flags, aa->oa_agl, rc);
1963 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1965 ldlm_lock_decref(lockh, mode);
1966 LDLM_LOCK_PUT(lock);
1970 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1972 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1973 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1974 * other synchronous requests, however keeping some locks and trying to obtain
1975 * others may take a considerable amount of time in a case of ost failure; and
1976 * when other sync requests do not get released lock from a client, the client
1977 * is evicted from the cluster -- such scenarious make the life difficult, so
1978 * release locks just after they are obtained. */
1979 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1980 __u64 *flags, union ldlm_policy_data *policy,
1981 struct ost_lvb *lvb, int kms_valid,
1982 osc_enqueue_upcall_f upcall, void *cookie,
1983 struct ldlm_enqueue_info *einfo,
1984 struct ptlrpc_request_set *rqset, int async, int agl)
1986 struct obd_device *obd = exp->exp_obd;
1987 struct lustre_handle lockh = { 0 };
1988 struct ptlrpc_request *req = NULL;
1989 int intent = *flags & LDLM_FL_HAS_INTENT;
1990 __u64 match_flags = *flags;
1991 enum ldlm_mode mode;
1995 /* Filesystem lock extents are extended to page boundaries so that
1996 * dealing with the page cache is a little smoother. */
1997 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1998 policy->l_extent.end |= ~PAGE_MASK;
2001 * kms is not valid when either object is completely fresh (so that no
2002 * locks are cached), or object was evicted. In the latter case cached
2003 * lock cannot be used, because it would prime inode state with
2004 * potentially stale LVB.
2009 /* Next, search for already existing extent locks that will cover us */
2010 /* If we're trying to read, we also search for an existing PW lock. The
2011 * VFS and page cache already protect us locally, so lots of readers/
2012 * writers can share a single PW lock.
2014 * There are problems with conversion deadlocks, so instead of
2015 * converting a read lock to a write lock, we'll just enqueue a new
2018 * At some point we should cancel the read lock instead of making them
2019 * send us a blocking callback, but there are problems with canceling
2020 * locks out from other users right now, too. */
2021 mode = einfo->ei_mode;
2022 if (einfo->ei_mode == LCK_PR)
2025 match_flags |= LDLM_FL_LVB_READY;
2027 match_flags |= LDLM_FL_BLOCK_GRANTED;
2028 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2029 einfo->ei_type, policy, mode, &lockh, 0);
2031 struct ldlm_lock *matched;
2033 if (*flags & LDLM_FL_TEST_LOCK)
2036 matched = ldlm_handle2lock(&lockh);
2038 /* AGL enqueues DLM locks speculatively. Therefore if
2039 * it already exists a DLM lock, it wll just inform the
2040 * caller to cancel the AGL process for this stripe. */
2041 ldlm_lock_decref(&lockh, mode);
2042 LDLM_LOCK_PUT(matched);
2044 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2045 *flags |= LDLM_FL_LVB_READY;
2047 /* We already have a lock, and it's referenced. */
2048 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2050 ldlm_lock_decref(&lockh, mode);
2051 LDLM_LOCK_PUT(matched);
2054 ldlm_lock_decref(&lockh, mode);
2055 LDLM_LOCK_PUT(matched);
2060 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2064 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2065 &RQF_LDLM_ENQUEUE_LVB);
2069 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2071 ptlrpc_request_free(req);
2075 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2077 ptlrpc_request_set_replen(req);
2080 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2081 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2083 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2084 sizeof(*lvb), LVB_T_OST, &lockh, async);
2087 struct osc_enqueue_args *aa;
2088 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2089 aa = ptlrpc_req_async_args(req);
2091 aa->oa_mode = einfo->ei_mode;
2092 aa->oa_type = einfo->ei_type;
2093 lustre_handle_copy(&aa->oa_lockh, &lockh);
2094 aa->oa_upcall = upcall;
2095 aa->oa_cookie = cookie;
2098 aa->oa_flags = flags;
2101 /* AGL is essentially to enqueue an DLM lock
2102 * in advance, so we don't care about the
2103 * result of AGL enqueue. */
2105 aa->oa_flags = NULL;
2108 req->rq_interpret_reply =
2109 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2110 if (rqset == PTLRPCD_SET)
2111 ptlrpcd_add_req(req);
2113 ptlrpc_set_add_req(rqset, req);
2114 } else if (intent) {
2115 ptlrpc_req_finished(req);
2120 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2123 ptlrpc_req_finished(req);
2128 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2129 enum ldlm_type type, union ldlm_policy_data *policy,
2130 enum ldlm_mode mode, __u64 *flags, void *data,
2131 struct lustre_handle *lockh, int unref)
2133 struct obd_device *obd = exp->exp_obd;
2134 __u64 lflags = *flags;
2138 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2141 /* Filesystem lock extents are extended to page boundaries so that
2142 * dealing with the page cache is a little smoother */
2143 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2144 policy->l_extent.end |= ~PAGE_MASK;
2146 /* Next, search for already existing extent locks that will cover us */
2147 /* If we're trying to read, we also search for an existing PW lock. The
2148 * VFS and page cache already protect us locally, so lots of readers/
2149 * writers can share a single PW lock. */
2153 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2154 res_id, type, policy, rc, lockh, unref);
2157 if (!osc_set_data_with_check(lockh, data)) {
2158 if (!(lflags & LDLM_FL_TEST_LOCK))
2159 ldlm_lock_decref(lockh, rc);
2163 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2164 ldlm_lock_addref(lockh, LCK_PR);
2165 ldlm_lock_decref(lockh, LCK_PW);
2172 static int osc_statfs_interpret(const struct lu_env *env,
2173 struct ptlrpc_request *req,
2174 struct osc_async_args *aa, int rc)
2176 struct obd_statfs *msfs;
2180 /* The request has in fact never been sent
2181 * due to issues at a higher level (LOV).
2182 * Exit immediately since the caller is
2183 * aware of the problem and takes care
2184 * of the clean up */
2187 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2188 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2194 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2196 GOTO(out, rc = -EPROTO);
2199 *aa->aa_oi->oi_osfs = *msfs;
2201 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2205 static int osc_statfs_async(struct obd_export *exp,
2206 struct obd_info *oinfo, __u64 max_age,
2207 struct ptlrpc_request_set *rqset)
2209 struct obd_device *obd = class_exp2obd(exp);
2210 struct ptlrpc_request *req;
2211 struct osc_async_args *aa;
2215 /* We could possibly pass max_age in the request (as an absolute
2216 * timestamp or a "seconds.usec ago") so the target can avoid doing
2217 * extra calls into the filesystem if that isn't necessary (e.g.
2218 * during mount that would help a bit). Having relative timestamps
2219 * is not so great if request processing is slow, while absolute
2220 * timestamps are not ideal because they need time synchronization. */
2221 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2225 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2227 ptlrpc_request_free(req);
2230 ptlrpc_request_set_replen(req);
2231 req->rq_request_portal = OST_CREATE_PORTAL;
2232 ptlrpc_at_set_req_timeout(req);
2234 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2235 /* procfs requests not want stat in wait for avoid deadlock */
2236 req->rq_no_resend = 1;
2237 req->rq_no_delay = 1;
2240 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2241 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2242 aa = ptlrpc_req_async_args(req);
2245 ptlrpc_set_add_req(rqset, req);
2249 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2250 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2252 struct obd_device *obd = class_exp2obd(exp);
2253 struct obd_statfs *msfs;
2254 struct ptlrpc_request *req;
2255 struct obd_import *imp = NULL;
2259 /*Since the request might also come from lprocfs, so we need
2260 *sync this with client_disconnect_export Bug15684*/
2261 down_read(&obd->u.cli.cl_sem);
2262 if (obd->u.cli.cl_import)
2263 imp = class_import_get(obd->u.cli.cl_import);
2264 up_read(&obd->u.cli.cl_sem);
2268 /* We could possibly pass max_age in the request (as an absolute
2269 * timestamp or a "seconds.usec ago") so the target can avoid doing
2270 * extra calls into the filesystem if that isn't necessary (e.g.
2271 * during mount that would help a bit). Having relative timestamps
2272 * is not so great if request processing is slow, while absolute
2273 * timestamps are not ideal because they need time synchronization. */
2274 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2276 class_import_put(imp);
2281 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2283 ptlrpc_request_free(req);
2286 ptlrpc_request_set_replen(req);
2287 req->rq_request_portal = OST_CREATE_PORTAL;
2288 ptlrpc_at_set_req_timeout(req);
2290 if (flags & OBD_STATFS_NODELAY) {
2291 /* procfs requests not want stat in wait for avoid deadlock */
2292 req->rq_no_resend = 1;
2293 req->rq_no_delay = 1;
2296 rc = ptlrpc_queue_wait(req);
2300 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2302 GOTO(out, rc = -EPROTO);
2309 ptlrpc_req_finished(req);
2313 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2314 void *karg, void __user *uarg)
2316 struct obd_device *obd = exp->exp_obd;
2317 struct obd_ioctl_data *data = karg;
2321 if (!try_module_get(THIS_MODULE)) {
2322 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2323 module_name(THIS_MODULE));
2327 case OBD_IOC_CLIENT_RECOVER:
2328 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2329 data->ioc_inlbuf1, 0);
2333 case IOC_OSC_SET_ACTIVE:
2334 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2337 case OBD_IOC_PING_TARGET:
2338 err = ptlrpc_obd_ping(obd);
2341 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2342 cmd, current_comm());
2343 GOTO(out, err = -ENOTTY);
2346 module_put(THIS_MODULE);
2350 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2351 u32 keylen, void *key,
2352 u32 vallen, void *val,
2353 struct ptlrpc_request_set *set)
2355 struct ptlrpc_request *req;
2356 struct obd_device *obd = exp->exp_obd;
2357 struct obd_import *imp = class_exp2cliimp(exp);
2362 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2364 if (KEY_IS(KEY_CHECKSUM)) {
2365 if (vallen != sizeof(int))
2367 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2371 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2372 sptlrpc_conf_client_adapt(obd);
2376 if (KEY_IS(KEY_FLUSH_CTX)) {
2377 sptlrpc_import_flush_my_ctx(imp);
2381 if (KEY_IS(KEY_CACHE_SET)) {
2382 struct client_obd *cli = &obd->u.cli;
2384 LASSERT(cli->cl_cache == NULL); /* only once */
2385 cli->cl_cache = (struct cl_client_cache *)val;
2386 cl_cache_incref(cli->cl_cache);
2387 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2389 /* add this osc into entity list */
2390 LASSERT(list_empty(&cli->cl_lru_osc));
2391 spin_lock(&cli->cl_cache->ccc_lru_lock);
2392 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2393 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2398 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2399 struct client_obd *cli = &obd->u.cli;
2400 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2401 long target = *(long *)val;
2403 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2408 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2411 /* We pass all other commands directly to OST. Since nobody calls osc
2412 methods directly and everybody is supposed to go through LOV, we
2413 assume lov checked invalid values for us.
2414 The only recognised values so far are evict_by_nid and mds_conn.
2415 Even if something bad goes through, we'd get a -EINVAL from OST
2418 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2419 &RQF_OST_SET_GRANT_INFO :
2424 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2425 RCL_CLIENT, keylen);
2426 if (!KEY_IS(KEY_GRANT_SHRINK))
2427 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2428 RCL_CLIENT, vallen);
2429 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2431 ptlrpc_request_free(req);
2435 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2436 memcpy(tmp, key, keylen);
2437 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2440 memcpy(tmp, val, vallen);
2442 if (KEY_IS(KEY_GRANT_SHRINK)) {
2443 struct osc_grant_args *aa;
2446 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2447 aa = ptlrpc_req_async_args(req);
2450 ptlrpc_req_finished(req);
2453 *oa = ((struct ost_body *)val)->oa;
2455 req->rq_interpret_reply = osc_shrink_grant_interpret;
2458 ptlrpc_request_set_replen(req);
2459 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2460 LASSERT(set != NULL);
2461 ptlrpc_set_add_req(set, req);
2462 ptlrpc_check_set(NULL, set);
2464 ptlrpcd_add_req(req);
2470 static int osc_reconnect(const struct lu_env *env,
2471 struct obd_export *exp, struct obd_device *obd,
2472 struct obd_uuid *cluuid,
2473 struct obd_connect_data *data,
2476 struct client_obd *cli = &obd->u.cli;
2478 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2482 spin_lock(&cli->cl_loi_list_lock);
2483 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2484 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2485 grant += cli->cl_dirty_grant;
2487 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2488 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2489 lost_grant = cli->cl_lost_grant;
2490 cli->cl_lost_grant = 0;
2491 spin_unlock(&cli->cl_loi_list_lock);
2493 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2494 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2495 data->ocd_version, data->ocd_grant, lost_grant);
2501 static int osc_disconnect(struct obd_export *exp)
2503 struct obd_device *obd = class_exp2obd(exp);
2506 rc = client_disconnect_export(exp);
2508 * Initially we put del_shrink_grant before disconnect_export, but it
2509 * causes the following problem if setup (connect) and cleanup
2510 * (disconnect) are tangled together.
2511 * connect p1 disconnect p2
2512 * ptlrpc_connect_import
2513 * ............... class_manual_cleanup
2516 * ptlrpc_connect_interrupt
2518 * add this client to shrink list
2520 * Bang! pinger trigger the shrink.
2521 * So the osc should be disconnected from the shrink list, after we
2522 * are sure the import has been destroyed. BUG18662
2524 if (obd->u.cli.cl_import == NULL)
2525 osc_del_shrink_grant(&obd->u.cli);
2529 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2530 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2532 struct lu_env *env = arg;
2533 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2534 struct ldlm_lock *lock;
2535 struct osc_object *osc = NULL;
2539 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2540 if (lock->l_ast_data != NULL && osc == NULL) {
2541 osc = lock->l_ast_data;
2542 cl_object_get(osc2cl(osc));
2545 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2546 * by the 2nd round of ldlm_namespace_clean() call in
2547 * osc_import_event(). */
2548 ldlm_clear_cleaned(lock);
2553 osc_object_invalidate(env, osc);
2554 cl_object_put(env, osc2cl(osc));
2560 static int osc_import_event(struct obd_device *obd,
2561 struct obd_import *imp,
2562 enum obd_import_event event)
2564 struct client_obd *cli;
2568 LASSERT(imp->imp_obd == obd);
2571 case IMP_EVENT_DISCON: {
2573 spin_lock(&cli->cl_loi_list_lock);
2574 cli->cl_avail_grant = 0;
2575 cli->cl_lost_grant = 0;
2576 spin_unlock(&cli->cl_loi_list_lock);
2579 case IMP_EVENT_INACTIVE: {
2580 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2583 case IMP_EVENT_INVALIDATE: {
2584 struct ldlm_namespace *ns = obd->obd_namespace;
2588 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2590 env = cl_env_get(&refcheck);
2592 osc_io_unplug(env, &obd->u.cli, NULL);
2594 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2595 osc_ldlm_resource_invalidate,
2597 cl_env_put(env, &refcheck);
2599 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2604 case IMP_EVENT_ACTIVE: {
2605 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2608 case IMP_EVENT_OCD: {
2609 struct obd_connect_data *ocd = &imp->imp_connect_data;
2611 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2612 osc_init_grant(&obd->u.cli, ocd);
2615 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2616 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2618 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2621 case IMP_EVENT_DEACTIVATE: {
2622 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2625 case IMP_EVENT_ACTIVATE: {
2626 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2630 CERROR("Unknown import event %d\n", event);
2637 * Determine whether the lock can be canceled before replaying the lock
2638 * during recovery, see bug16774 for detailed information.
2640 * \retval zero the lock can't be canceled
2641 * \retval other ok to cancel
2643 static int osc_cancel_weight(struct ldlm_lock *lock)
2646 * Cancel all unused and granted extent lock.
2648 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2649 lock->l_granted_mode == lock->l_req_mode &&
2650 osc_ldlm_weigh_ast(lock) == 0)
2656 static int brw_queue_work(const struct lu_env *env, void *data)
2658 struct client_obd *cli = data;
2660 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2662 osc_io_unplug(env, cli, NULL);
2666 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2668 struct client_obd *cli = &obd->u.cli;
2669 struct obd_type *type;
2677 rc = ptlrpcd_addref();
2681 rc = client_obd_setup(obd, lcfg);
2683 GOTO(out_ptlrpcd, rc);
2685 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2686 if (IS_ERR(handler))
2687 GOTO(out_client_setup, rc = PTR_ERR(handler));
2688 cli->cl_writeback_work = handler;
2690 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2691 if (IS_ERR(handler))
2692 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2693 cli->cl_lru_work = handler;
2695 rc = osc_quota_setup(obd);
2697 GOTO(out_ptlrpcd_work, rc);
2699 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2701 #ifdef CONFIG_PROC_FS
2702 obd->obd_vars = lprocfs_osc_obd_vars;
2704 /* If this is true then both client (osc) and server (osp) are on the
2705 * same node. The osp layer if loaded first will register the osc proc
2706 * directory. In that case this obd_device will be attached its proc
2707 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2708 type = class_search_type(LUSTRE_OSP_NAME);
2709 if (type && type->typ_procsym) {
2710 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2712 obd->obd_vars, obd);
2713 if (IS_ERR(obd->obd_proc_entry)) {
2714 rc = PTR_ERR(obd->obd_proc_entry);
2715 CERROR("error %d setting up lprocfs for %s\n", rc,
2717 obd->obd_proc_entry = NULL;
2720 rc = lprocfs_obd_setup(obd);
2723 /* If the basic OSC proc tree construction succeeded then
2724 * lets do the rest. */
2726 lproc_osc_attach_seqstat(obd);
2727 sptlrpc_lprocfs_cliobd_attach(obd);
2728 ptlrpc_lprocfs_register_obd(obd);
2732 * We try to control the total number of requests with a upper limit
2733 * osc_reqpool_maxreqcount. There might be some race which will cause
2734 * over-limit allocation, but it is fine.
2736 req_count = atomic_read(&osc_pool_req_count);
2737 if (req_count < osc_reqpool_maxreqcount) {
2738 adding = cli->cl_max_rpcs_in_flight + 2;
2739 if (req_count + adding > osc_reqpool_maxreqcount)
2740 adding = osc_reqpool_maxreqcount - req_count;
2742 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2743 atomic_add(added, &osc_pool_req_count);
2746 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2747 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2749 spin_lock(&osc_shrink_lock);
2750 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2751 spin_unlock(&osc_shrink_lock);
2756 if (cli->cl_writeback_work != NULL) {
2757 ptlrpcd_destroy_work(cli->cl_writeback_work);
2758 cli->cl_writeback_work = NULL;
2760 if (cli->cl_lru_work != NULL) {
2761 ptlrpcd_destroy_work(cli->cl_lru_work);
2762 cli->cl_lru_work = NULL;
2765 client_obd_cleanup(obd);
2771 static int osc_precleanup(struct obd_device *obd)
2773 struct client_obd *cli = &obd->u.cli;
2777 * for echo client, export may be on zombie list, wait for
2778 * zombie thread to cull it, because cli.cl_import will be
2779 * cleared in client_disconnect_export():
2780 * class_export_destroy() -> obd_cleanup() ->
2781 * echo_device_free() -> echo_client_cleanup() ->
2782 * obd_disconnect() -> osc_disconnect() ->
2783 * client_disconnect_export()
2785 obd_zombie_barrier();
2786 if (cli->cl_writeback_work) {
2787 ptlrpcd_destroy_work(cli->cl_writeback_work);
2788 cli->cl_writeback_work = NULL;
2791 if (cli->cl_lru_work) {
2792 ptlrpcd_destroy_work(cli->cl_lru_work);
2793 cli->cl_lru_work = NULL;
2796 obd_cleanup_client_import(obd);
2797 ptlrpc_lprocfs_unregister_obd(obd);
2798 lprocfs_obd_cleanup(obd);
2802 int osc_cleanup(struct obd_device *obd)
2804 struct client_obd *cli = &obd->u.cli;
2809 spin_lock(&osc_shrink_lock);
2810 list_del(&cli->cl_shrink_list);
2811 spin_unlock(&osc_shrink_lock);
2814 if (cli->cl_cache != NULL) {
2815 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2816 spin_lock(&cli->cl_cache->ccc_lru_lock);
2817 list_del_init(&cli->cl_lru_osc);
2818 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2819 cli->cl_lru_left = NULL;
2820 cl_cache_decref(cli->cl_cache);
2821 cli->cl_cache = NULL;
2824 /* free memory of osc quota cache */
2825 osc_quota_cleanup(obd);
2827 rc = client_obd_cleanup(obd);
2833 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2835 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2836 return rc > 0 ? 0: rc;
2839 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2841 return osc_process_config_base(obd, buf);
2844 static struct obd_ops osc_obd_ops = {
2845 .o_owner = THIS_MODULE,
2846 .o_setup = osc_setup,
2847 .o_precleanup = osc_precleanup,
2848 .o_cleanup = osc_cleanup,
2849 .o_add_conn = client_import_add_conn,
2850 .o_del_conn = client_import_del_conn,
2851 .o_connect = client_connect_import,
2852 .o_reconnect = osc_reconnect,
2853 .o_disconnect = osc_disconnect,
2854 .o_statfs = osc_statfs,
2855 .o_statfs_async = osc_statfs_async,
2856 .o_create = osc_create,
2857 .o_destroy = osc_destroy,
2858 .o_getattr = osc_getattr,
2859 .o_setattr = osc_setattr,
2860 .o_iocontrol = osc_iocontrol,
2861 .o_set_info_async = osc_set_info_async,
2862 .o_import_event = osc_import_event,
2863 .o_process_config = osc_process_config,
2864 .o_quotactl = osc_quotactl,
2867 static struct shrinker *osc_cache_shrinker;
2868 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2869 DEFINE_SPINLOCK(osc_shrink_lock);
2871 #ifndef HAVE_SHRINKER_COUNT
2872 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2874 struct shrink_control scv = {
2875 .nr_to_scan = shrink_param(sc, nr_to_scan),
2876 .gfp_mask = shrink_param(sc, gfp_mask)
2878 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2879 struct shrinker *shrinker = NULL;
2882 (void)osc_cache_shrink_scan(shrinker, &scv);
2884 return osc_cache_shrink_count(shrinker, &scv);
2888 static int __init osc_init(void)
2890 bool enable_proc = true;
2891 struct obd_type *type;
2892 unsigned int reqpool_size;
2893 unsigned int reqsize;
2895 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2896 osc_cache_shrink_count, osc_cache_shrink_scan);
2899 /* print an address of _any_ initialized kernel symbol from this
2900 * module, to allow debugging with gdb that doesn't support data
2901 * symbols from modules.*/
2902 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2904 rc = lu_kmem_init(osc_caches);
2908 type = class_search_type(LUSTRE_OSP_NAME);
2909 if (type != NULL && type->typ_procsym != NULL)
2910 enable_proc = false;
2912 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2913 LUSTRE_OSC_NAME, &osc_device_type);
2917 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2919 /* This is obviously too much memory, only prevent overflow here */
2920 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2921 GOTO(out_type, rc = -EINVAL);
2923 reqpool_size = osc_reqpool_mem_max << 20;
2926 while (reqsize < OST_IO_MAXREQSIZE)
2927 reqsize = reqsize << 1;
2930 * We don't enlarge the request count in OSC pool according to
2931 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2932 * tried after normal allocation failed. So a small OSC pool won't
2933 * cause much performance degression in most of cases.
2935 osc_reqpool_maxreqcount = reqpool_size / reqsize;
2937 atomic_set(&osc_pool_req_count, 0);
2938 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2939 ptlrpc_add_rqs_to_pool);
2941 if (osc_rq_pool != NULL)
2945 class_unregister_type(LUSTRE_OSC_NAME);
2947 lu_kmem_fini(osc_caches);
2952 static void __exit osc_exit(void)
2954 remove_shrinker(osc_cache_shrinker);
2955 class_unregister_type(LUSTRE_OSC_NAME);
2956 lu_kmem_fini(osc_caches);
2957 ptlrpc_free_rq_pool(osc_rq_pool);
2960 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2961 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2962 MODULE_VERSION(LUSTRE_VERSION_STRING);
2963 MODULE_LICENSE("GPL");
2965 module_init(osc_init);
2966 module_exit(osc_exit);