4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
50 #include "osc_internal.h"
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
63 #define osc_grant_args osc_brw_async_args
65 struct osc_setattr_args {
67 obd_enqueue_update_f sa_upcall;
71 struct osc_fsync_args {
72 struct osc_object *fa_obj;
74 obd_enqueue_update_f fa_upcall;
78 struct osc_ladvise_args {
80 obd_enqueue_update_f la_upcall;
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 struct ost_body *body;
92 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101 struct ptlrpc_request *req;
102 struct ost_body *body;
106 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
110 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112 ptlrpc_request_free(req);
116 osc_pack_req_body(req, oa);
118 ptlrpc_request_set_replen(req);
120 rc = ptlrpc_queue_wait(req);
124 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126 GOTO(out, rc = -EPROTO);
128 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131 oa->o_blksize = cli_brw_size(exp->exp_obd);
132 oa->o_valid |= OBD_MD_FLBLKSZ;
136 ptlrpc_req_finished(req);
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144 struct ptlrpc_request *req;
145 struct ost_body *body;
149 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
155 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157 ptlrpc_request_free(req);
161 osc_pack_req_body(req, oa);
163 ptlrpc_request_set_replen(req);
165 rc = ptlrpc_queue_wait(req);
169 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171 GOTO(out, rc = -EPROTO);
173 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
177 ptlrpc_req_finished(req);
182 static int osc_setattr_interpret(const struct lu_env *env,
183 struct ptlrpc_request *req,
184 struct osc_setattr_args *sa, int rc)
186 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply =
233 (ptlrpc_interpterer_t)osc_setattr_interpret;
235 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
236 sa = ptlrpc_req_async_args(req);
238 sa->sa_upcall = upcall;
239 sa->sa_cookie = cookie;
241 if (rqset == PTLRPCD_SET)
242 ptlrpcd_add_req(req);
244 ptlrpc_set_add_req(rqset, req);
250 static int osc_ladvise_interpret(const struct lu_env *env,
251 struct ptlrpc_request *req,
254 struct osc_ladvise_args *la = arg;
255 struct ost_body *body;
261 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
263 GOTO(out, rc = -EPROTO);
265 *la->la_oa = body->oa;
267 rc = la->la_upcall(la->la_cookie, rc);
272 * If rqset is NULL, do not wait for response. Upcall and cookie could also
273 * be NULL in this case
275 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
276 struct ladvise_hdr *ladvise_hdr,
277 obd_enqueue_update_f upcall, void *cookie,
278 struct ptlrpc_request_set *rqset)
280 struct ptlrpc_request *req;
281 struct ost_body *body;
282 struct osc_ladvise_args *la;
284 struct lu_ladvise *req_ladvise;
285 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
286 int num_advise = ladvise_hdr->lah_count;
287 struct ladvise_hdr *req_ladvise_hdr;
290 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
294 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
295 num_advise * sizeof(*ladvise));
296 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
298 ptlrpc_request_free(req);
301 req->rq_request_portal = OST_IO_PORTAL;
302 ptlrpc_at_set_req_timeout(req);
304 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
306 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
309 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
310 &RMF_OST_LADVISE_HDR);
311 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
313 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
314 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
315 ptlrpc_request_set_replen(req);
318 /* Do not wait for response. */
319 ptlrpcd_add_req(req);
323 req->rq_interpret_reply = osc_ladvise_interpret;
324 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
325 la = ptlrpc_req_async_args(req);
327 la->la_upcall = upcall;
328 la->la_cookie = cookie;
330 if (rqset == PTLRPCD_SET)
331 ptlrpcd_add_req(req);
333 ptlrpc_set_add_req(rqset, req);
338 static int osc_create(const struct lu_env *env, struct obd_export *exp,
341 struct ptlrpc_request *req;
342 struct ost_body *body;
347 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
348 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
350 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
352 GOTO(out, rc = -ENOMEM);
354 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
356 ptlrpc_request_free(req);
360 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
363 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
365 ptlrpc_request_set_replen(req);
367 rc = ptlrpc_queue_wait(req);
371 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
373 GOTO(out_req, rc = -EPROTO);
375 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
376 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
378 oa->o_blksize = cli_brw_size(exp->exp_obd);
379 oa->o_valid |= OBD_MD_FLBLKSZ;
381 CDEBUG(D_HA, "transno: %lld\n",
382 lustre_msg_get_transno(req->rq_repmsg));
384 ptlrpc_req_finished(req);
389 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
390 obd_enqueue_update_f upcall, void *cookie)
392 struct ptlrpc_request *req;
393 struct osc_setattr_args *sa;
394 struct obd_import *imp = class_exp2cliimp(exp);
395 struct ost_body *body;
400 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
404 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
406 ptlrpc_request_free(req);
410 osc_set_io_portal(req);
412 ptlrpc_at_set_req_timeout(req);
414 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
416 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
418 ptlrpc_request_set_replen(req);
420 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
422 sa = ptlrpc_req_async_args(req);
424 sa->sa_upcall = upcall;
425 sa->sa_cookie = cookie;
427 ptlrpcd_add_req(req);
431 EXPORT_SYMBOL(osc_punch_send);
433 static int osc_sync_interpret(const struct lu_env *env,
434 struct ptlrpc_request *req,
437 struct osc_fsync_args *fa = arg;
438 struct ost_body *body;
439 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
440 unsigned long valid = 0;
441 struct cl_object *obj;
447 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
449 CERROR("can't unpack ost_body\n");
450 GOTO(out, rc = -EPROTO);
453 *fa->fa_oa = body->oa;
454 obj = osc2cl(fa->fa_obj);
456 /* Update osc object's blocks attribute */
457 cl_object_attr_lock(obj);
458 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
459 attr->cat_blocks = body->oa.o_blocks;
464 cl_object_attr_update(env, obj, attr, valid);
465 cl_object_attr_unlock(obj);
468 rc = fa->fa_upcall(fa->fa_cookie, rc);
472 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
473 obd_enqueue_update_f upcall, void *cookie,
474 struct ptlrpc_request_set *rqset)
476 struct obd_export *exp = osc_export(obj);
477 struct ptlrpc_request *req;
478 struct ost_body *body;
479 struct osc_fsync_args *fa;
483 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
487 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
489 ptlrpc_request_free(req);
493 /* overload the size and blocks fields in the oa with start/end */
494 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
496 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
498 ptlrpc_request_set_replen(req);
499 req->rq_interpret_reply = osc_sync_interpret;
501 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
502 fa = ptlrpc_req_async_args(req);
505 fa->fa_upcall = upcall;
506 fa->fa_cookie = cookie;
508 if (rqset == PTLRPCD_SET)
509 ptlrpcd_add_req(req);
511 ptlrpc_set_add_req(rqset, req);
516 /* Find and cancel locally locks matched by @mode in the resource found by
517 * @objid. Found locks are added into @cancel list. Returns the amount of
518 * locks added to @cancels list. */
519 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
520 struct list_head *cancels,
521 enum ldlm_mode mode, __u64 lock_flags)
523 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
524 struct ldlm_res_id res_id;
525 struct ldlm_resource *res;
529 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
530 * export) but disabled through procfs (flag in NS).
532 * This distinguishes from a case when ELC is not supported originally,
533 * when we still want to cancel locks in advance and just cancel them
534 * locally, without sending any RPC. */
535 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
538 ostid_build_res_name(&oa->o_oi, &res_id);
539 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
543 LDLM_RESOURCE_ADDREF(res);
544 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
545 lock_flags, 0, NULL);
546 LDLM_RESOURCE_DELREF(res);
547 ldlm_resource_putref(res);
551 static int osc_destroy_interpret(const struct lu_env *env,
552 struct ptlrpc_request *req, void *data,
555 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
557 atomic_dec(&cli->cl_destroy_in_flight);
558 wake_up(&cli->cl_destroy_waitq);
562 static int osc_can_send_destroy(struct client_obd *cli)
564 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
565 cli->cl_max_rpcs_in_flight) {
566 /* The destroy request can be sent */
569 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
570 cli->cl_max_rpcs_in_flight) {
572 * The counter has been modified between the two atomic
575 wake_up(&cli->cl_destroy_waitq);
580 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
583 struct client_obd *cli = &exp->exp_obd->u.cli;
584 struct ptlrpc_request *req;
585 struct ost_body *body;
586 struct list_head cancels = LIST_HEAD_INIT(cancels);
591 CDEBUG(D_INFO, "oa NULL\n");
595 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
596 LDLM_FL_DISCARD_DATA);
598 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
600 ldlm_lock_list_put(&cancels, l_bl_ast, count);
604 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
607 ptlrpc_request_free(req);
611 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
612 ptlrpc_at_set_req_timeout(req);
614 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
616 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
618 ptlrpc_request_set_replen(req);
620 req->rq_interpret_reply = osc_destroy_interpret;
621 if (!osc_can_send_destroy(cli)) {
622 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
625 * Wait until the number of on-going destroy RPCs drops
626 * under max_rpc_in_flight
628 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
629 osc_can_send_destroy(cli), &lwi);
631 ptlrpc_req_finished(req);
636 /* Do not wait for response */
637 ptlrpcd_add_req(req);
641 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
644 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
646 LASSERT(!(oa->o_valid & bits));
649 spin_lock(&cli->cl_loi_list_lock);
650 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
651 oa->o_dirty = cli->cl_dirty_grant;
653 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
654 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
655 CERROR("dirty %lu > dirty_max %lu\n",
657 cli->cl_dirty_max_pages);
659 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
660 (long)(obd_max_dirty_pages + 1))) {
661 /* The atomic_read() allowing the atomic_inc() are
662 * not covered by a lock thus they may safely race and trip
663 * this CERROR() unless we add in a small fudge factor (+1). */
664 CERROR("%s: dirty %ld > system dirty_max %ld\n",
665 cli_name(cli), atomic_long_read(&obd_dirty_pages),
666 obd_max_dirty_pages);
668 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
670 CERROR("dirty %lu - dirty_max %lu too big???\n",
671 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
674 unsigned long nrpages;
675 unsigned long undirty;
677 nrpages = cli->cl_max_pages_per_rpc;
678 nrpages *= cli->cl_max_rpcs_in_flight + 1;
679 nrpages = max(nrpages, cli->cl_dirty_max_pages);
680 undirty = nrpages << PAGE_SHIFT;
681 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
685 /* take extent tax into account when asking for more
687 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
688 cli->cl_max_extent_pages;
689 undirty += nrextents * cli->cl_grant_extent_tax;
691 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
692 * to add extent tax, etc.
694 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
695 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
697 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
698 oa->o_dropped = cli->cl_lost_grant;
699 cli->cl_lost_grant = 0;
700 spin_unlock(&cli->cl_loi_list_lock);
701 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
702 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
705 void osc_update_next_shrink(struct client_obd *cli)
707 cli->cl_next_shrink_grant = ktime_get_seconds() +
708 cli->cl_grant_shrink_interval;
710 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
711 cli->cl_next_shrink_grant);
714 static void __osc_update_grant(struct client_obd *cli, u64 grant)
716 spin_lock(&cli->cl_loi_list_lock);
717 cli->cl_avail_grant += grant;
718 spin_unlock(&cli->cl_loi_list_lock);
721 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
723 if (body->oa.o_valid & OBD_MD_FLGRANT) {
724 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
725 __osc_update_grant(cli, body->oa.o_grant);
730 * grant thread data for shrinking space.
732 struct grant_thread_data {
733 struct list_head gtd_clients;
734 struct mutex gtd_mutex;
735 unsigned long gtd_stopped:1;
737 static struct grant_thread_data client_gtd;
739 static int osc_shrink_grant_interpret(const struct lu_env *env,
740 struct ptlrpc_request *req,
743 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
744 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
745 struct ost_body *body;
748 __osc_update_grant(cli, oa->o_grant);
752 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
754 osc_update_grant(cli, body);
756 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
761 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
763 spin_lock(&cli->cl_loi_list_lock);
764 oa->o_grant = cli->cl_avail_grant / 4;
765 cli->cl_avail_grant -= oa->o_grant;
766 spin_unlock(&cli->cl_loi_list_lock);
767 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
768 oa->o_valid |= OBD_MD_FLFLAGS;
771 oa->o_flags |= OBD_FL_SHRINK_GRANT;
772 osc_update_next_shrink(cli);
775 /* Shrink the current grant, either from some large amount to enough for a
776 * full set of in-flight RPCs, or if we have already shrunk to that limit
777 * then to enough for a single RPC. This avoids keeping more grant than
778 * needed, and avoids shrinking the grant piecemeal. */
779 static int osc_shrink_grant(struct client_obd *cli)
781 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
782 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
784 spin_lock(&cli->cl_loi_list_lock);
785 if (cli->cl_avail_grant <= target_bytes)
786 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
787 spin_unlock(&cli->cl_loi_list_lock);
789 return osc_shrink_grant_to_target(cli, target_bytes);
792 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
795 struct ost_body *body;
798 spin_lock(&cli->cl_loi_list_lock);
799 /* Don't shrink if we are already above or below the desired limit
800 * We don't want to shrink below a single RPC, as that will negatively
801 * impact block allocation and long-term performance. */
802 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
803 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
805 if (target_bytes >= cli->cl_avail_grant) {
806 spin_unlock(&cli->cl_loi_list_lock);
809 spin_unlock(&cli->cl_loi_list_lock);
815 osc_announce_cached(cli, &body->oa, 0);
817 spin_lock(&cli->cl_loi_list_lock);
818 if (target_bytes >= cli->cl_avail_grant) {
819 /* available grant has changed since target calculation */
820 spin_unlock(&cli->cl_loi_list_lock);
821 GOTO(out_free, rc = 0);
823 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
824 cli->cl_avail_grant = target_bytes;
825 spin_unlock(&cli->cl_loi_list_lock);
826 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
827 body->oa.o_valid |= OBD_MD_FLFLAGS;
828 body->oa.o_flags = 0;
830 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
831 osc_update_next_shrink(cli);
833 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
834 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
835 sizeof(*body), body, NULL);
837 __osc_update_grant(cli, body->oa.o_grant);
843 static int osc_should_shrink_grant(struct client_obd *client)
845 time64_t next_shrink = client->cl_next_shrink_grant;
847 if (client->cl_import == NULL)
850 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
851 client->cl_import->imp_grant_shrink_disabled) {
852 osc_update_next_shrink(client);
856 if (ktime_get_seconds() >= next_shrink - 5) {
857 /* Get the current RPC size directly, instead of going via:
858 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
859 * Keep comment here so that it can be found by searching. */
860 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
862 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
863 client->cl_avail_grant > brw_size)
866 osc_update_next_shrink(client);
871 #define GRANT_SHRINK_RPC_BATCH 100
873 static struct delayed_work work;
875 static void osc_grant_work_handler(struct work_struct *data)
877 struct client_obd *cli;
879 bool init_next_shrink = true;
880 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
883 mutex_lock(&client_gtd.gtd_mutex);
884 list_for_each_entry(cli, &client_gtd.gtd_clients,
886 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
887 osc_should_shrink_grant(cli)) {
888 osc_shrink_grant(cli);
892 if (!init_next_shrink) {
893 if (cli->cl_next_shrink_grant < next_shrink &&
894 cli->cl_next_shrink_grant > ktime_get_seconds())
895 next_shrink = cli->cl_next_shrink_grant;
897 init_next_shrink = false;
898 next_shrink = cli->cl_next_shrink_grant;
901 mutex_unlock(&client_gtd.gtd_mutex);
903 if (client_gtd.gtd_stopped == 1)
906 if (next_shrink > ktime_get_seconds())
907 schedule_delayed_work(&work, msecs_to_jiffies(
908 (next_shrink - ktime_get_seconds()) *
911 schedule_work(&work.work);
915 * Start grant thread for returing grant to server for idle clients.
917 static int osc_start_grant_work(void)
919 client_gtd.gtd_stopped = 0;
920 mutex_init(&client_gtd.gtd_mutex);
921 INIT_LIST_HEAD(&client_gtd.gtd_clients);
923 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
924 schedule_work(&work.work);
929 static void osc_stop_grant_work(void)
931 client_gtd.gtd_stopped = 1;
932 cancel_delayed_work_sync(&work);
935 static void osc_add_grant_list(struct client_obd *client)
937 mutex_lock(&client_gtd.gtd_mutex);
938 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
939 mutex_unlock(&client_gtd.gtd_mutex);
942 static void osc_del_grant_list(struct client_obd *client)
944 if (list_empty(&client->cl_grant_chain))
947 mutex_lock(&client_gtd.gtd_mutex);
948 list_del_init(&client->cl_grant_chain);
949 mutex_unlock(&client_gtd.gtd_mutex);
952 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
955 * ocd_grant is the total grant amount we're expect to hold: if we've
956 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
957 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
960 * race is tolerable here: if we're evicted, but imp_state already
961 * left EVICTED state, then cl_dirty_pages must be 0 already.
963 spin_lock(&cli->cl_loi_list_lock);
964 cli->cl_avail_grant = ocd->ocd_grant;
965 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
966 unsigned long consumed = cli->cl_reserved_grant;
968 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
969 consumed += cli->cl_dirty_grant;
971 consumed += cli->cl_dirty_pages << PAGE_SHIFT;
972 if (cli->cl_avail_grant < consumed) {
973 CERROR("%s: granted %ld but already consumed %ld\n",
974 cli_name(cli), cli->cl_avail_grant, consumed);
975 cli->cl_avail_grant = 0;
977 cli->cl_avail_grant -= consumed;
981 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
985 /* overhead for each extent insertion */
986 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
987 /* determine the appropriate chunk size used by osc_extent. */
988 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
989 ocd->ocd_grant_blkbits);
990 /* max_pages_per_rpc must be chunk aligned */
991 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
992 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
993 ~chunk_mask) & chunk_mask;
994 /* determine maximum extent size, in #pages */
995 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
996 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
997 if (cli->cl_max_extent_pages == 0)
998 cli->cl_max_extent_pages = 1;
1000 cli->cl_grant_extent_tax = 0;
1001 cli->cl_chunkbits = PAGE_SHIFT;
1002 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1004 spin_unlock(&cli->cl_loi_list_lock);
1006 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1007 "chunk bits: %d cl_max_extent_pages: %d\n",
1009 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1010 cli->cl_max_extent_pages);
1012 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1013 osc_add_grant_list(cli);
1015 EXPORT_SYMBOL(osc_init_grant);
1017 /* We assume that the reason this OSC got a short read is because it read
1018 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1019 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1020 * this stripe never got written at or beyond this stripe offset yet. */
1021 static void handle_short_read(int nob_read, size_t page_count,
1022 struct brw_page **pga)
1027 /* skip bytes read OK */
1028 while (nob_read > 0) {
1029 LASSERT (page_count > 0);
1031 if (pga[i]->count > nob_read) {
1032 /* EOF inside this page */
1033 ptr = kmap(pga[i]->pg) +
1034 (pga[i]->off & ~PAGE_MASK);
1035 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1042 nob_read -= pga[i]->count;
1047 /* zero remaining pages */
1048 while (page_count-- > 0) {
1049 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1050 memset(ptr, 0, pga[i]->count);
1056 static int check_write_rcs(struct ptlrpc_request *req,
1057 int requested_nob, int niocount,
1058 size_t page_count, struct brw_page **pga)
1063 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1064 sizeof(*remote_rcs) *
1066 if (remote_rcs == NULL) {
1067 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1071 /* return error if any niobuf was in error */
1072 for (i = 0; i < niocount; i++) {
1073 if ((int)remote_rcs[i] < 0)
1074 return(remote_rcs[i]);
1076 if (remote_rcs[i] != 0) {
1077 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1078 i, remote_rcs[i], req);
1082 if (req->rq_bulk != NULL &&
1083 req->rq_bulk->bd_nob_transferred != requested_nob) {
1084 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1085 req->rq_bulk->bd_nob_transferred, requested_nob);
1092 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1094 if (p1->flag != p2->flag) {
1095 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_SYNC |
1096 OBD_BRW_ASYNC | OBD_BRW_NOQUOTA |
1099 /* warn if we try to combine flags that we don't know to be
1100 * safe to combine */
1101 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1102 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1103 "report this at https://jira.whamcloud.com/\n",
1104 p1->flag, p2->flag);
1109 return (p1->off + p1->count == p2->off);
1112 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1113 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1114 size_t pg_count, struct brw_page **pga,
1115 int opc, obd_dif_csum_fn *fn,
1119 struct ahash_request *req;
1120 /* Used Adler as the default checksum type on top of DIF tags */
1121 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1122 struct page *__page;
1123 unsigned char *buffer;
1125 unsigned int bufsize;
1127 int used_number = 0;
1133 LASSERT(pg_count > 0);
1135 __page = alloc_page(GFP_KERNEL);
1139 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1142 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1143 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1147 buffer = kmap(__page);
1148 guard_start = (__u16 *)buffer;
1149 guard_number = PAGE_SIZE / sizeof(*guard_start);
1150 while (nob > 0 && pg_count > 0) {
1151 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1153 /* corrupt the data before we compute the checksum, to
1154 * simulate an OST->client data error */
1155 if (unlikely(i == 0 && opc == OST_READ &&
1156 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1157 unsigned char *ptr = kmap(pga[i]->pg);
1158 int off = pga[i]->off & ~PAGE_MASK;
1160 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1165 * The left guard number should be able to hold checksums of a
1168 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1169 pga[i]->off & ~PAGE_MASK,
1171 guard_start + used_number,
1172 guard_number - used_number,
1178 used_number += used;
1179 if (used_number == guard_number) {
1180 cfs_crypto_hash_update_page(req, __page, 0,
1181 used_number * sizeof(*guard_start));
1185 nob -= pga[i]->count;
1193 if (used_number != 0)
1194 cfs_crypto_hash_update_page(req, __page, 0,
1195 used_number * sizeof(*guard_start));
1197 bufsize = sizeof(cksum);
1198 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1200 /* For sending we only compute the wrong checksum instead
1201 * of corrupting the data so it is still correct on a redo */
1202 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1207 __free_page(__page);
1210 #else /* !CONFIG_CRC_T10DIF */
1211 #define obd_dif_ip_fn NULL
1212 #define obd_dif_crc_fn NULL
1213 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1215 #endif /* CONFIG_CRC_T10DIF */
1217 static int osc_checksum_bulk(int nob, size_t pg_count,
1218 struct brw_page **pga, int opc,
1219 enum cksum_types cksum_type,
1223 struct ahash_request *req;
1224 unsigned int bufsize;
1225 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1227 LASSERT(pg_count > 0);
1229 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1231 CERROR("Unable to initialize checksum hash %s\n",
1232 cfs_crypto_hash_name(cfs_alg));
1233 return PTR_ERR(req);
1236 while (nob > 0 && pg_count > 0) {
1237 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1239 /* corrupt the data before we compute the checksum, to
1240 * simulate an OST->client data error */
1241 if (i == 0 && opc == OST_READ &&
1242 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1243 unsigned char *ptr = kmap(pga[i]->pg);
1244 int off = pga[i]->off & ~PAGE_MASK;
1246 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1249 cfs_crypto_hash_update_page(req, pga[i]->pg,
1250 pga[i]->off & ~PAGE_MASK,
1252 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1253 (int)(pga[i]->off & ~PAGE_MASK));
1255 nob -= pga[i]->count;
1260 bufsize = sizeof(*cksum);
1261 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1263 /* For sending we only compute the wrong checksum instead
1264 * of corrupting the data so it is still correct on a redo */
1265 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1271 static int osc_checksum_bulk_rw(const char *obd_name,
1272 enum cksum_types cksum_type,
1273 int nob, size_t pg_count,
1274 struct brw_page **pga, int opc,
1277 obd_dif_csum_fn *fn = NULL;
1278 int sector_size = 0;
1282 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1285 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1286 opc, fn, sector_size, check_sum);
1288 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1295 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1296 u32 page_count, struct brw_page **pga,
1297 struct ptlrpc_request **reqp, int resend)
1299 struct ptlrpc_request *req;
1300 struct ptlrpc_bulk_desc *desc;
1301 struct ost_body *body;
1302 struct obd_ioobj *ioobj;
1303 struct niobuf_remote *niobuf;
1304 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1305 struct osc_brw_async_args *aa;
1306 struct req_capsule *pill;
1307 struct brw_page *pg_prev;
1309 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1312 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1313 RETURN(-ENOMEM); /* Recoverable */
1314 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1315 RETURN(-EINVAL); /* Fatal */
1317 if ((cmd & OBD_BRW_WRITE) != 0) {
1319 req = ptlrpc_request_alloc_pool(cli->cl_import,
1321 &RQF_OST_BRW_WRITE);
1324 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1329 for (niocount = i = 1; i < page_count; i++) {
1330 if (!can_merge_pages(pga[i - 1], pga[i]))
1334 pill = &req->rq_pill;
1335 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1337 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1338 niocount * sizeof(*niobuf));
1340 for (i = 0; i < page_count; i++)
1341 short_io_size += pga[i]->count;
1343 /* Check if read/write is small enough to be a short io. */
1344 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1345 !imp_connect_shortio(cli->cl_import))
1348 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1349 opc == OST_READ ? 0 : short_io_size);
1350 if (opc == OST_READ)
1351 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1354 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1356 ptlrpc_request_free(req);
1359 osc_set_io_portal(req);
1361 ptlrpc_at_set_req_timeout(req);
1362 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1364 req->rq_no_retry_einprogress = 1;
1366 if (short_io_size != 0) {
1368 short_io_buf = NULL;
1372 desc = ptlrpc_prep_bulk_imp(req, page_count,
1373 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1374 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1375 PTLRPC_BULK_PUT_SINK) |
1376 PTLRPC_BULK_BUF_KIOV,
1378 &ptlrpc_bulk_kiov_pin_ops);
1381 GOTO(out, rc = -ENOMEM);
1382 /* NB request now owns desc and will free it when it gets freed */
1384 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1385 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1386 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1387 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1389 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1391 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1392 * and from_kgid(), because they are asynchronous. Fortunately, variable
1393 * oa contains valid o_uid and o_gid in these two operations.
1394 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1395 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1396 * other process logic */
1397 body->oa.o_uid = oa->o_uid;
1398 body->oa.o_gid = oa->o_gid;
1400 obdo_to_ioobj(oa, ioobj);
1401 ioobj->ioo_bufcnt = niocount;
1402 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1403 * that might be send for this request. The actual number is decided
1404 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1405 * "max - 1" for old client compatibility sending "0", and also so the
1406 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1408 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1410 ioobj_max_brw_set(ioobj, 0);
1412 if (short_io_size != 0) {
1413 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1414 body->oa.o_valid |= OBD_MD_FLFLAGS;
1415 body->oa.o_flags = 0;
1417 body->oa.o_flags |= OBD_FL_SHORT_IO;
1418 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1420 if (opc == OST_WRITE) {
1421 short_io_buf = req_capsule_client_get(pill,
1423 LASSERT(short_io_buf != NULL);
1427 LASSERT(page_count > 0);
1429 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1430 struct brw_page *pg = pga[i];
1431 int poff = pg->off & ~PAGE_MASK;
1433 LASSERT(pg->count > 0);
1434 /* make sure there is no gap in the middle of page array */
1435 LASSERTF(page_count == 1 ||
1436 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1437 ergo(i > 0 && i < page_count - 1,
1438 poff == 0 && pg->count == PAGE_SIZE) &&
1439 ergo(i == page_count - 1, poff == 0)),
1440 "i: %d/%d pg: %p off: %llu, count: %u\n",
1441 i, page_count, pg, pg->off, pg->count);
1442 LASSERTF(i == 0 || pg->off > pg_prev->off,
1443 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1444 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1446 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1447 pg_prev->pg, page_private(pg_prev->pg),
1448 pg_prev->pg->index, pg_prev->off);
1449 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1450 (pg->flag & OBD_BRW_SRVLOCK));
1451 if (short_io_size != 0 && opc == OST_WRITE) {
1452 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1454 LASSERT(short_io_size >= requested_nob + pg->count);
1455 memcpy(short_io_buf + requested_nob,
1458 ll_kunmap_atomic(ptr, KM_USER0);
1459 } else if (short_io_size == 0) {
1460 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1463 requested_nob += pg->count;
1465 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1467 niobuf->rnb_len += pg->count;
1469 niobuf->rnb_offset = pg->off;
1470 niobuf->rnb_len = pg->count;
1471 niobuf->rnb_flags = pg->flag;
1476 LASSERTF((void *)(niobuf - niocount) ==
1477 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1478 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1479 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1481 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1483 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1484 body->oa.o_valid |= OBD_MD_FLFLAGS;
1485 body->oa.o_flags = 0;
1487 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1490 if (osc_should_shrink_grant(cli))
1491 osc_shrink_grant_local(cli, &body->oa);
1493 /* size[REQ_REC_OFF] still sizeof (*body) */
1494 if (opc == OST_WRITE) {
1495 if (cli->cl_checksum &&
1496 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1497 /* store cl_cksum_type in a local variable since
1498 * it can be changed via lprocfs */
1499 enum cksum_types cksum_type = cli->cl_cksum_type;
1501 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1502 body->oa.o_flags = 0;
1504 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1506 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1508 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1509 requested_nob, page_count,
1513 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1517 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1520 /* save this in 'oa', too, for later checking */
1521 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1522 oa->o_flags |= obd_cksum_type_pack(obd_name,
1525 /* clear out the checksum flag, in case this is a
1526 * resend but cl_checksum is no longer set. b=11238 */
1527 oa->o_valid &= ~OBD_MD_FLCKSUM;
1529 oa->o_cksum = body->oa.o_cksum;
1530 /* 1 RC per niobuf */
1531 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1532 sizeof(__u32) * niocount);
1534 if (cli->cl_checksum &&
1535 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1536 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1537 body->oa.o_flags = 0;
1538 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1539 cli->cl_cksum_type);
1540 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1543 /* Client cksum has been already copied to wire obdo in previous
1544 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1545 * resent due to cksum error, this will allow Server to
1546 * check+dump pages on its side */
1548 ptlrpc_request_set_replen(req);
1550 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1551 aa = ptlrpc_req_async_args(req);
1553 aa->aa_requested_nob = requested_nob;
1554 aa->aa_nio_count = niocount;
1555 aa->aa_page_count = page_count;
1559 INIT_LIST_HEAD(&aa->aa_oaps);
1562 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1563 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1564 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1565 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1569 ptlrpc_req_finished(req);
1573 char dbgcksum_file_name[PATH_MAX];
1575 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1576 struct brw_page **pga, __u32 server_cksum,
1584 /* will only keep dump of pages on first error for the same range in
1585 * file/fid, not during the resends/retries. */
1586 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1587 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1588 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1589 libcfs_debug_file_path_arr :
1590 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1591 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1592 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1593 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1595 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1596 client_cksum, server_cksum);
1597 filp = filp_open(dbgcksum_file_name,
1598 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1602 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1603 "checksum error: rc = %d\n", dbgcksum_file_name,
1606 CERROR("%s: can't open to dump pages with checksum "
1607 "error: rc = %d\n", dbgcksum_file_name, rc);
1611 for (i = 0; i < page_count; i++) {
1612 len = pga[i]->count;
1613 buf = kmap(pga[i]->pg);
1615 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1617 CERROR("%s: wanted to write %u but got %d "
1618 "error\n", dbgcksum_file_name, len, rc);
1623 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1624 dbgcksum_file_name, rc);
1629 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1631 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1632 filp_close(filp, NULL);
1637 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1638 __u32 client_cksum, __u32 server_cksum,
1639 struct osc_brw_async_args *aa)
1641 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1642 enum cksum_types cksum_type;
1643 obd_dif_csum_fn *fn = NULL;
1644 int sector_size = 0;
1649 if (server_cksum == client_cksum) {
1650 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1654 if (aa->aa_cli->cl_checksum_dump)
1655 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1656 server_cksum, client_cksum);
1658 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1661 switch (cksum_type) {
1662 case OBD_CKSUM_T10IP512:
1666 case OBD_CKSUM_T10IP4K:
1670 case OBD_CKSUM_T10CRC512:
1671 fn = obd_dif_crc_fn;
1674 case OBD_CKSUM_T10CRC4K:
1675 fn = obd_dif_crc_fn;
1683 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1684 aa->aa_page_count, aa->aa_ppga,
1685 OST_WRITE, fn, sector_size,
1688 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1689 aa->aa_ppga, OST_WRITE, cksum_type,
1693 msg = "failed to calculate the client write checksum";
1694 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1695 msg = "the server did not use the checksum type specified in "
1696 "the original request - likely a protocol problem";
1697 else if (new_cksum == server_cksum)
1698 msg = "changed on the client after we checksummed it - "
1699 "likely false positive due to mmap IO (bug 11742)";
1700 else if (new_cksum == client_cksum)
1701 msg = "changed in transit before arrival at OST";
1703 msg = "changed in transit AND doesn't match the original - "
1704 "likely false positive due to mmap IO (bug 11742)";
1706 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1707 DFID " object "DOSTID" extent [%llu-%llu], original "
1708 "client csum %x (type %x), server csum %x (type %x),"
1709 " client csum now %x\n",
1710 obd_name, msg, libcfs_nid2str(peer->nid),
1711 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1712 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1713 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1714 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1715 aa->aa_ppga[aa->aa_page_count - 1]->off +
1716 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1718 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1719 server_cksum, cksum_type, new_cksum);
1723 /* Note rc enters this function as number of bytes transferred */
1724 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1726 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1727 struct client_obd *cli = aa->aa_cli;
1728 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1729 const struct lnet_process_id *peer =
1730 &req->rq_import->imp_connection->c_peer;
1731 struct ost_body *body;
1732 u32 client_cksum = 0;
1735 if (rc < 0 && rc != -EDQUOT) {
1736 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1740 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1741 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1743 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1747 /* set/clear over quota flag for a uid/gid/projid */
1748 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1749 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1750 unsigned qid[LL_MAXQUOTAS] = {
1751 body->oa.o_uid, body->oa.o_gid,
1752 body->oa.o_projid };
1753 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1754 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1755 body->oa.o_valid, body->oa.o_flags);
1756 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1760 osc_update_grant(cli, body);
1765 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1766 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1768 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1770 CERROR("Unexpected +ve rc %d\n", rc);
1774 if (req->rq_bulk != NULL &&
1775 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1778 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1779 check_write_checksum(&body->oa, peer, client_cksum,
1780 body->oa.o_cksum, aa))
1783 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1784 aa->aa_page_count, aa->aa_ppga);
1788 /* The rest of this function executes only for OST_READs */
1790 if (req->rq_bulk == NULL) {
1791 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1793 LASSERT(rc == req->rq_status);
1795 /* if unwrap_bulk failed, return -EAGAIN to retry */
1796 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1799 GOTO(out, rc = -EAGAIN);
1801 if (rc > aa->aa_requested_nob) {
1802 CERROR("Unexpected rc %d (%d requested)\n", rc,
1803 aa->aa_requested_nob);
1807 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1808 CERROR ("Unexpected rc %d (%d transferred)\n",
1809 rc, req->rq_bulk->bd_nob_transferred);
1813 if (req->rq_bulk == NULL) {
1815 int nob, pg_count, i = 0;
1818 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1819 pg_count = aa->aa_page_count;
1820 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1823 while (nob > 0 && pg_count > 0) {
1825 int count = aa->aa_ppga[i]->count > nob ?
1826 nob : aa->aa_ppga[i]->count;
1828 CDEBUG(D_CACHE, "page %p count %d\n",
1829 aa->aa_ppga[i]->pg, count);
1830 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1831 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1833 ll_kunmap_atomic((void *) ptr, KM_USER0);
1842 if (rc < aa->aa_requested_nob)
1843 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1845 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1846 static int cksum_counter;
1847 u32 server_cksum = body->oa.o_cksum;
1850 enum cksum_types cksum_type;
1851 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1852 body->oa.o_flags : 0;
1854 cksum_type = obd_cksum_type_unpack(o_flags);
1855 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1856 aa->aa_page_count, aa->aa_ppga,
1857 OST_READ, &client_cksum);
1861 if (req->rq_bulk != NULL &&
1862 peer->nid != req->rq_bulk->bd_sender) {
1864 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1867 if (server_cksum != client_cksum) {
1868 struct ost_body *clbody;
1869 u32 page_count = aa->aa_page_count;
1871 clbody = req_capsule_client_get(&req->rq_pill,
1873 if (cli->cl_checksum_dump)
1874 dump_all_bulk_pages(&clbody->oa, page_count,
1875 aa->aa_ppga, server_cksum,
1878 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1879 "%s%s%s inode "DFID" object "DOSTID
1880 " extent [%llu-%llu], client %x, "
1881 "server %x, cksum_type %x\n",
1883 libcfs_nid2str(peer->nid),
1885 clbody->oa.o_valid & OBD_MD_FLFID ?
1886 clbody->oa.o_parent_seq : 0ULL,
1887 clbody->oa.o_valid & OBD_MD_FLFID ?
1888 clbody->oa.o_parent_oid : 0,
1889 clbody->oa.o_valid & OBD_MD_FLFID ?
1890 clbody->oa.o_parent_ver : 0,
1891 POSTID(&body->oa.o_oi),
1892 aa->aa_ppga[0]->off,
1893 aa->aa_ppga[page_count-1]->off +
1894 aa->aa_ppga[page_count-1]->count - 1,
1895 client_cksum, server_cksum,
1898 aa->aa_oa->o_cksum = client_cksum;
1902 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1905 } else if (unlikely(client_cksum)) {
1906 static int cksum_missed;
1909 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1910 CERROR("Checksum %u requested from %s but not sent\n",
1911 cksum_missed, libcfs_nid2str(peer->nid));
1917 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1918 aa->aa_oa, &body->oa);
1923 static int osc_brw_redo_request(struct ptlrpc_request *request,
1924 struct osc_brw_async_args *aa, int rc)
1926 struct ptlrpc_request *new_req;
1927 struct osc_brw_async_args *new_aa;
1928 struct osc_async_page *oap;
1931 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1932 "redo for recoverable error %d", rc);
1934 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1935 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1936 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1937 aa->aa_ppga, &new_req, 1);
1941 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1942 if (oap->oap_request != NULL) {
1943 LASSERTF(request == oap->oap_request,
1944 "request %p != oap_request %p\n",
1945 request, oap->oap_request);
1946 if (oap->oap_interrupted) {
1947 ptlrpc_req_finished(new_req);
1952 /* New request takes over pga and oaps from old request.
1953 * Note that copying a list_head doesn't work, need to move it... */
1955 new_req->rq_interpret_reply = request->rq_interpret_reply;
1956 new_req->rq_async_args = request->rq_async_args;
1957 new_req->rq_commit_cb = request->rq_commit_cb;
1958 /* cap resend delay to the current request timeout, this is similar to
1959 * what ptlrpc does (see after_reply()) */
1960 if (aa->aa_resends > new_req->rq_timeout)
1961 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1963 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1964 new_req->rq_generation_set = 1;
1965 new_req->rq_import_generation = request->rq_import_generation;
1967 new_aa = ptlrpc_req_async_args(new_req);
1969 INIT_LIST_HEAD(&new_aa->aa_oaps);
1970 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1971 INIT_LIST_HEAD(&new_aa->aa_exts);
1972 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1973 new_aa->aa_resends = aa->aa_resends;
1975 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1976 if (oap->oap_request) {
1977 ptlrpc_req_finished(oap->oap_request);
1978 oap->oap_request = ptlrpc_request_addref(new_req);
1982 /* XXX: This code will run into problem if we're going to support
1983 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1984 * and wait for all of them to be finished. We should inherit request
1985 * set from old request. */
1986 ptlrpcd_add_req(new_req);
1988 DEBUG_REQ(D_INFO, new_req, "new request");
1993 * ugh, we want disk allocation on the target to happen in offset order. we'll
1994 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1995 * fine for our small page arrays and doesn't require allocation. its an
1996 * insertion sort that swaps elements that are strides apart, shrinking the
1997 * stride down until its '1' and the array is sorted.
1999 static void sort_brw_pages(struct brw_page **array, int num)
2002 struct brw_page *tmp;
2006 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2011 for (i = stride ; i < num ; i++) {
2014 while (j >= stride && array[j - stride]->off > tmp->off) {
2015 array[j] = array[j - stride];
2020 } while (stride > 1);
2023 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2025 LASSERT(ppga != NULL);
2026 OBD_FREE(ppga, sizeof(*ppga) * count);
2029 static int brw_interpret(const struct lu_env *env,
2030 struct ptlrpc_request *req, void *data, int rc)
2032 struct osc_brw_async_args *aa = data;
2033 struct osc_extent *ext;
2034 struct osc_extent *tmp;
2035 struct client_obd *cli = aa->aa_cli;
2036 unsigned long transferred = 0;
2039 rc = osc_brw_fini_request(req, rc);
2040 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2041 /* When server return -EINPROGRESS, client should always retry
2042 * regardless of the number of times the bulk was resent already. */
2043 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2044 if (req->rq_import_generation !=
2045 req->rq_import->imp_generation) {
2046 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2047 ""DOSTID", rc = %d.\n",
2048 req->rq_import->imp_obd->obd_name,
2049 POSTID(&aa->aa_oa->o_oi), rc);
2050 } else if (rc == -EINPROGRESS ||
2051 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2052 rc = osc_brw_redo_request(req, aa, rc);
2054 CERROR("%s: too many resent retries for object: "
2055 "%llu:%llu, rc = %d.\n",
2056 req->rq_import->imp_obd->obd_name,
2057 POSTID(&aa->aa_oa->o_oi), rc);
2062 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2067 struct obdo *oa = aa->aa_oa;
2068 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2069 unsigned long valid = 0;
2070 struct cl_object *obj;
2071 struct osc_async_page *last;
2073 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2074 obj = osc2cl(last->oap_obj);
2076 cl_object_attr_lock(obj);
2077 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2078 attr->cat_blocks = oa->o_blocks;
2079 valid |= CAT_BLOCKS;
2081 if (oa->o_valid & OBD_MD_FLMTIME) {
2082 attr->cat_mtime = oa->o_mtime;
2085 if (oa->o_valid & OBD_MD_FLATIME) {
2086 attr->cat_atime = oa->o_atime;
2089 if (oa->o_valid & OBD_MD_FLCTIME) {
2090 attr->cat_ctime = oa->o_ctime;
2094 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2095 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2096 loff_t last_off = last->oap_count + last->oap_obj_off +
2099 /* Change file size if this is an out of quota or
2100 * direct IO write and it extends the file size */
2101 if (loi->loi_lvb.lvb_size < last_off) {
2102 attr->cat_size = last_off;
2105 /* Extend KMS if it's not a lockless write */
2106 if (loi->loi_kms < last_off &&
2107 oap2osc_page(last)->ops_srvlock == 0) {
2108 attr->cat_kms = last_off;
2114 cl_object_attr_update(env, obj, attr, valid);
2115 cl_object_attr_unlock(obj);
2117 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2120 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2121 osc_inc_unstable_pages(req);
2123 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2124 list_del_init(&ext->oe_link);
2125 osc_extent_finish(env, ext, 1,
2126 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2128 LASSERT(list_empty(&aa->aa_exts));
2129 LASSERT(list_empty(&aa->aa_oaps));
2131 transferred = (req->rq_bulk == NULL ? /* short io */
2132 aa->aa_requested_nob :
2133 req->rq_bulk->bd_nob_transferred);
2135 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2136 ptlrpc_lprocfs_brw(req, transferred);
2138 spin_lock(&cli->cl_loi_list_lock);
2139 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2140 * is called so we know whether to go to sync BRWs or wait for more
2141 * RPCs to complete */
2142 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2143 cli->cl_w_in_flight--;
2145 cli->cl_r_in_flight--;
2146 osc_wake_cache_waiters(cli);
2147 spin_unlock(&cli->cl_loi_list_lock);
2149 osc_io_unplug(env, cli, NULL);
2153 static void brw_commit(struct ptlrpc_request *req)
2155 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2156 * this called via the rq_commit_cb, I need to ensure
2157 * osc_dec_unstable_pages is still called. Otherwise unstable
2158 * pages may be leaked. */
2159 spin_lock(&req->rq_lock);
2160 if (likely(req->rq_unstable)) {
2161 req->rq_unstable = 0;
2162 spin_unlock(&req->rq_lock);
2164 osc_dec_unstable_pages(req);
2166 req->rq_committed = 1;
2167 spin_unlock(&req->rq_lock);
2172 * Build an RPC by the list of extent @ext_list. The caller must ensure
2173 * that the total pages in this list are NOT over max pages per RPC.
2174 * Extents in the list must be in OES_RPC state.
2176 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2177 struct list_head *ext_list, int cmd)
2179 struct ptlrpc_request *req = NULL;
2180 struct osc_extent *ext;
2181 struct brw_page **pga = NULL;
2182 struct osc_brw_async_args *aa = NULL;
2183 struct obdo *oa = NULL;
2184 struct osc_async_page *oap;
2185 struct osc_object *obj = NULL;
2186 struct cl_req_attr *crattr = NULL;
2187 loff_t starting_offset = OBD_OBJECT_EOF;
2188 loff_t ending_offset = 0;
2192 bool soft_sync = false;
2193 bool interrupted = false;
2194 bool ndelay = false;
2198 __u32 layout_version = 0;
2199 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2200 struct ost_body *body;
2202 LASSERT(!list_empty(ext_list));
2204 /* add pages into rpc_list to build BRW rpc */
2205 list_for_each_entry(ext, ext_list, oe_link) {
2206 LASSERT(ext->oe_state == OES_RPC);
2207 mem_tight |= ext->oe_memalloc;
2208 grant += ext->oe_grants;
2209 page_count += ext->oe_nr_pages;
2210 layout_version = MAX(layout_version, ext->oe_layout_version);
2215 soft_sync = osc_over_unstable_soft_limit(cli);
2217 mpflag = cfs_memory_pressure_get_and_set();
2219 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2221 GOTO(out, rc = -ENOMEM);
2223 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2225 GOTO(out, rc = -ENOMEM);
2228 list_for_each_entry(ext, ext_list, oe_link) {
2229 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2231 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2233 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2234 pga[i] = &oap->oap_brw_page;
2235 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2238 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2239 if (starting_offset == OBD_OBJECT_EOF ||
2240 starting_offset > oap->oap_obj_off)
2241 starting_offset = oap->oap_obj_off;
2243 LASSERT(oap->oap_page_off == 0);
2244 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2245 ending_offset = oap->oap_obj_off +
2248 LASSERT(oap->oap_page_off + oap->oap_count ==
2250 if (oap->oap_interrupted)
2257 /* first page in the list */
2258 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2260 crattr = &osc_env_info(env)->oti_req_attr;
2261 memset(crattr, 0, sizeof(*crattr));
2262 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2263 crattr->cra_flags = ~0ULL;
2264 crattr->cra_page = oap2cl_page(oap);
2265 crattr->cra_oa = oa;
2266 cl_req_attr_set(env, osc2cl(obj), crattr);
2268 if (cmd == OBD_BRW_WRITE) {
2269 oa->o_grant_used = grant;
2270 if (layout_version > 0) {
2271 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2272 PFID(&oa->o_oi.oi_fid), layout_version);
2274 oa->o_layout_version = layout_version;
2275 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2279 sort_brw_pages(pga, page_count);
2280 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2282 CERROR("prep_req failed: %d\n", rc);
2286 req->rq_commit_cb = brw_commit;
2287 req->rq_interpret_reply = brw_interpret;
2288 req->rq_memalloc = mem_tight != 0;
2289 oap->oap_request = ptlrpc_request_addref(req);
2290 if (interrupted && !req->rq_intr)
2291 ptlrpc_mark_interrupted(req);
2293 req->rq_no_resend = req->rq_no_delay = 1;
2294 /* probably set a shorter timeout value.
2295 * to handle ETIMEDOUT in brw_interpret() correctly. */
2296 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2299 /* Need to update the timestamps after the request is built in case
2300 * we race with setattr (locally or in queue at OST). If OST gets
2301 * later setattr before earlier BRW (as determined by the request xid),
2302 * the OST will not use BRW timestamps. Sadly, there is no obvious
2303 * way to do this in a single call. bug 10150 */
2304 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2305 crattr->cra_oa = &body->oa;
2306 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2307 cl_req_attr_set(env, osc2cl(obj), crattr);
2308 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2310 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2311 aa = ptlrpc_req_async_args(req);
2312 INIT_LIST_HEAD(&aa->aa_oaps);
2313 list_splice_init(&rpc_list, &aa->aa_oaps);
2314 INIT_LIST_HEAD(&aa->aa_exts);
2315 list_splice_init(ext_list, &aa->aa_exts);
2317 spin_lock(&cli->cl_loi_list_lock);
2318 starting_offset >>= PAGE_SHIFT;
2319 if (cmd == OBD_BRW_READ) {
2320 cli->cl_r_in_flight++;
2321 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2322 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2323 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2324 starting_offset + 1);
2326 cli->cl_w_in_flight++;
2327 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2328 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2329 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2330 starting_offset + 1);
2332 spin_unlock(&cli->cl_loi_list_lock);
2334 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2335 page_count, aa, cli->cl_r_in_flight,
2336 cli->cl_w_in_flight);
2337 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2339 ptlrpcd_add_req(req);
2345 cfs_memory_pressure_restore(mpflag);
2348 LASSERT(req == NULL);
2351 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2353 OBD_FREE(pga, sizeof(*pga) * page_count);
2354 /* this should happen rarely and is pretty bad, it makes the
2355 * pending list not follow the dirty order */
2356 while (!list_empty(ext_list)) {
2357 ext = list_entry(ext_list->next, struct osc_extent,
2359 list_del_init(&ext->oe_link);
2360 osc_extent_finish(env, ext, 0, rc);
2366 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2370 LASSERT(lock != NULL);
2372 lock_res_and_lock(lock);
2374 if (lock->l_ast_data == NULL)
2375 lock->l_ast_data = data;
2376 if (lock->l_ast_data == data)
2379 unlock_res_and_lock(lock);
2384 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2385 void *cookie, struct lustre_handle *lockh,
2386 enum ldlm_mode mode, __u64 *flags, bool speculative,
2389 bool intent = *flags & LDLM_FL_HAS_INTENT;
2393 /* The request was created before ldlm_cli_enqueue call. */
2394 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2395 struct ldlm_reply *rep;
2397 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2398 LASSERT(rep != NULL);
2400 rep->lock_policy_res1 =
2401 ptlrpc_status_ntoh(rep->lock_policy_res1);
2402 if (rep->lock_policy_res1)
2403 errcode = rep->lock_policy_res1;
2405 *flags |= LDLM_FL_LVB_READY;
2406 } else if (errcode == ELDLM_OK) {
2407 *flags |= LDLM_FL_LVB_READY;
2410 /* Call the update callback. */
2411 rc = (*upcall)(cookie, lockh, errcode);
2413 /* release the reference taken in ldlm_cli_enqueue() */
2414 if (errcode == ELDLM_LOCK_MATCHED)
2416 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2417 ldlm_lock_decref(lockh, mode);
2422 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2423 struct osc_enqueue_args *aa, int rc)
2425 struct ldlm_lock *lock;
2426 struct lustre_handle *lockh = &aa->oa_lockh;
2427 enum ldlm_mode mode = aa->oa_mode;
2428 struct ost_lvb *lvb = aa->oa_lvb;
2429 __u32 lvb_len = sizeof(*lvb);
2434 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2436 lock = ldlm_handle2lock(lockh);
2437 LASSERTF(lock != NULL,
2438 "lockh %#llx, req %p, aa %p - client evicted?\n",
2439 lockh->cookie, req, aa);
2441 /* Take an additional reference so that a blocking AST that
2442 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2443 * to arrive after an upcall has been executed by
2444 * osc_enqueue_fini(). */
2445 ldlm_lock_addref(lockh, mode);
2447 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2448 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2450 /* Let CP AST to grant the lock first. */
2451 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2453 if (aa->oa_speculative) {
2454 LASSERT(aa->oa_lvb == NULL);
2455 LASSERT(aa->oa_flags == NULL);
2456 aa->oa_flags = &flags;
2459 /* Complete obtaining the lock procedure. */
2460 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2461 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2463 /* Complete osc stuff. */
2464 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2465 aa->oa_flags, aa->oa_speculative, rc);
2467 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2469 ldlm_lock_decref(lockh, mode);
2470 LDLM_LOCK_PUT(lock);
2474 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2476 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2477 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2478 * other synchronous requests, however keeping some locks and trying to obtain
2479 * others may take a considerable amount of time in a case of ost failure; and
2480 * when other sync requests do not get released lock from a client, the client
2481 * is evicted from the cluster -- such scenarious make the life difficult, so
2482 * release locks just after they are obtained. */
2483 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2484 __u64 *flags, union ldlm_policy_data *policy,
2485 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2486 void *cookie, struct ldlm_enqueue_info *einfo,
2487 struct ptlrpc_request_set *rqset, int async,
2490 struct obd_device *obd = exp->exp_obd;
2491 struct lustre_handle lockh = { 0 };
2492 struct ptlrpc_request *req = NULL;
2493 int intent = *flags & LDLM_FL_HAS_INTENT;
2494 __u64 match_flags = *flags;
2495 enum ldlm_mode mode;
2499 /* Filesystem lock extents are extended to page boundaries so that
2500 * dealing with the page cache is a little smoother. */
2501 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2502 policy->l_extent.end |= ~PAGE_MASK;
2504 /* Next, search for already existing extent locks that will cover us */
2505 /* If we're trying to read, we also search for an existing PW lock. The
2506 * VFS and page cache already protect us locally, so lots of readers/
2507 * writers can share a single PW lock.
2509 * There are problems with conversion deadlocks, so instead of
2510 * converting a read lock to a write lock, we'll just enqueue a new
2513 * At some point we should cancel the read lock instead of making them
2514 * send us a blocking callback, but there are problems with canceling
2515 * locks out from other users right now, too. */
2516 mode = einfo->ei_mode;
2517 if (einfo->ei_mode == LCK_PR)
2519 /* Normal lock requests must wait for the LVB to be ready before
2520 * matching a lock; speculative lock requests do not need to,
2521 * because they will not actually use the lock. */
2523 match_flags |= LDLM_FL_LVB_READY;
2525 match_flags |= LDLM_FL_BLOCK_GRANTED;
2526 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2527 einfo->ei_type, policy, mode, &lockh, 0);
2529 struct ldlm_lock *matched;
2531 if (*flags & LDLM_FL_TEST_LOCK)
2534 matched = ldlm_handle2lock(&lockh);
2536 /* This DLM lock request is speculative, and does not
2537 * have an associated IO request. Therefore if there
2538 * is already a DLM lock, it wll just inform the
2539 * caller to cancel the request for this stripe.*/
2540 lock_res_and_lock(matched);
2541 if (ldlm_extent_equal(&policy->l_extent,
2542 &matched->l_policy_data.l_extent))
2546 unlock_res_and_lock(matched);
2548 ldlm_lock_decref(&lockh, mode);
2549 LDLM_LOCK_PUT(matched);
2551 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2552 *flags |= LDLM_FL_LVB_READY;
2554 /* We already have a lock, and it's referenced. */
2555 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2557 ldlm_lock_decref(&lockh, mode);
2558 LDLM_LOCK_PUT(matched);
2561 ldlm_lock_decref(&lockh, mode);
2562 LDLM_LOCK_PUT(matched);
2566 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2570 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2571 &RQF_LDLM_ENQUEUE_LVB);
2575 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2577 ptlrpc_request_free(req);
2581 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2583 ptlrpc_request_set_replen(req);
2586 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2587 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2589 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2590 sizeof(*lvb), LVB_T_OST, &lockh, async);
2593 struct osc_enqueue_args *aa;
2594 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2595 aa = ptlrpc_req_async_args(req);
2597 aa->oa_mode = einfo->ei_mode;
2598 aa->oa_type = einfo->ei_type;
2599 lustre_handle_copy(&aa->oa_lockh, &lockh);
2600 aa->oa_upcall = upcall;
2601 aa->oa_cookie = cookie;
2602 aa->oa_speculative = speculative;
2604 aa->oa_flags = flags;
2607 /* speculative locks are essentially to enqueue
2608 * a DLM lock in advance, so we don't care
2609 * about the result of the enqueue. */
2611 aa->oa_flags = NULL;
2614 req->rq_interpret_reply =
2615 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2616 if (rqset == PTLRPCD_SET)
2617 ptlrpcd_add_req(req);
2619 ptlrpc_set_add_req(rqset, req);
2620 } else if (intent) {
2621 ptlrpc_req_finished(req);
2626 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2627 flags, speculative, rc);
2629 ptlrpc_req_finished(req);
2634 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2635 struct ldlm_res_id *res_id, enum ldlm_type type,
2636 union ldlm_policy_data *policy, enum ldlm_mode mode,
2637 __u64 *flags, struct osc_object *obj,
2638 struct lustre_handle *lockh, int unref)
2640 struct obd_device *obd = exp->exp_obd;
2641 __u64 lflags = *flags;
2645 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2648 /* Filesystem lock extents are extended to page boundaries so that
2649 * dealing with the page cache is a little smoother */
2650 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2651 policy->l_extent.end |= ~PAGE_MASK;
2653 /* Next, search for already existing extent locks that will cover us */
2654 /* If we're trying to read, we also search for an existing PW lock. The
2655 * VFS and page cache already protect us locally, so lots of readers/
2656 * writers can share a single PW lock. */
2660 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2661 res_id, type, policy, rc, lockh, unref);
2662 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2666 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2668 LASSERT(lock != NULL);
2669 if (osc_set_lock_data(lock, obj)) {
2670 lock_res_and_lock(lock);
2671 if (!ldlm_is_lvb_cached(lock)) {
2672 LASSERT(lock->l_ast_data == obj);
2673 osc_lock_lvb_update(env, obj, lock, NULL);
2674 ldlm_set_lvb_cached(lock);
2676 unlock_res_and_lock(lock);
2678 ldlm_lock_decref(lockh, rc);
2681 LDLM_LOCK_PUT(lock);
2686 static int osc_statfs_interpret(const struct lu_env *env,
2687 struct ptlrpc_request *req,
2688 struct osc_async_args *aa, int rc)
2690 struct obd_statfs *msfs;
2694 /* The request has in fact never been sent
2695 * due to issues at a higher level (LOV).
2696 * Exit immediately since the caller is
2697 * aware of the problem and takes care
2698 * of the clean up */
2701 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2702 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2708 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2710 GOTO(out, rc = -EPROTO);
2713 *aa->aa_oi->oi_osfs = *msfs;
2715 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719 static int osc_statfs_async(struct obd_export *exp,
2720 struct obd_info *oinfo, time64_t max_age,
2721 struct ptlrpc_request_set *rqset)
2723 struct obd_device *obd = class_exp2obd(exp);
2724 struct ptlrpc_request *req;
2725 struct osc_async_args *aa;
2729 /* We could possibly pass max_age in the request (as an absolute
2730 * timestamp or a "seconds.usec ago") so the target can avoid doing
2731 * extra calls into the filesystem if that isn't necessary (e.g.
2732 * during mount that would help a bit). Having relative timestamps
2733 * is not so great if request processing is slow, while absolute
2734 * timestamps are not ideal because they need time synchronization. */
2735 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2739 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2741 ptlrpc_request_free(req);
2744 ptlrpc_request_set_replen(req);
2745 req->rq_request_portal = OST_CREATE_PORTAL;
2746 ptlrpc_at_set_req_timeout(req);
2748 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2749 /* procfs requests not want stat in wait for avoid deadlock */
2750 req->rq_no_resend = 1;
2751 req->rq_no_delay = 1;
2754 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2755 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2756 aa = ptlrpc_req_async_args(req);
2759 ptlrpc_set_add_req(rqset, req);
2763 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2764 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2766 struct obd_device *obd = class_exp2obd(exp);
2767 struct obd_statfs *msfs;
2768 struct ptlrpc_request *req;
2769 struct obd_import *imp = NULL;
2774 /*Since the request might also come from lprocfs, so we need
2775 *sync this with client_disconnect_export Bug15684*/
2776 down_read(&obd->u.cli.cl_sem);
2777 if (obd->u.cli.cl_import)
2778 imp = class_import_get(obd->u.cli.cl_import);
2779 up_read(&obd->u.cli.cl_sem);
2783 /* We could possibly pass max_age in the request (as an absolute
2784 * timestamp or a "seconds.usec ago") so the target can avoid doing
2785 * extra calls into the filesystem if that isn't necessary (e.g.
2786 * during mount that would help a bit). Having relative timestamps
2787 * is not so great if request processing is slow, while absolute
2788 * timestamps are not ideal because they need time synchronization. */
2789 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2791 class_import_put(imp);
2796 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2798 ptlrpc_request_free(req);
2801 ptlrpc_request_set_replen(req);
2802 req->rq_request_portal = OST_CREATE_PORTAL;
2803 ptlrpc_at_set_req_timeout(req);
2805 if (flags & OBD_STATFS_NODELAY) {
2806 /* procfs requests not want stat in wait for avoid deadlock */
2807 req->rq_no_resend = 1;
2808 req->rq_no_delay = 1;
2811 rc = ptlrpc_queue_wait(req);
2815 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2817 GOTO(out, rc = -EPROTO);
2823 ptlrpc_req_finished(req);
2827 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2828 void *karg, void __user *uarg)
2830 struct obd_device *obd = exp->exp_obd;
2831 struct obd_ioctl_data *data = karg;
2835 if (!try_module_get(THIS_MODULE)) {
2836 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2837 module_name(THIS_MODULE));
2841 case OBD_IOC_CLIENT_RECOVER:
2842 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2843 data->ioc_inlbuf1, 0);
2847 case IOC_OSC_SET_ACTIVE:
2848 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2851 case OBD_IOC_PING_TARGET:
2852 err = ptlrpc_obd_ping(obd);
2855 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2856 cmd, current_comm());
2857 GOTO(out, err = -ENOTTY);
2860 module_put(THIS_MODULE);
2864 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2865 u32 keylen, void *key, u32 vallen, void *val,
2866 struct ptlrpc_request_set *set)
2868 struct ptlrpc_request *req;
2869 struct obd_device *obd = exp->exp_obd;
2870 struct obd_import *imp = class_exp2cliimp(exp);
2875 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2877 if (KEY_IS(KEY_CHECKSUM)) {
2878 if (vallen != sizeof(int))
2880 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2884 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2885 sptlrpc_conf_client_adapt(obd);
2889 if (KEY_IS(KEY_FLUSH_CTX)) {
2890 sptlrpc_import_flush_my_ctx(imp);
2894 if (KEY_IS(KEY_CACHE_SET)) {
2895 struct client_obd *cli = &obd->u.cli;
2897 LASSERT(cli->cl_cache == NULL); /* only once */
2898 cli->cl_cache = (struct cl_client_cache *)val;
2899 cl_cache_incref(cli->cl_cache);
2900 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2902 /* add this osc into entity list */
2903 LASSERT(list_empty(&cli->cl_lru_osc));
2904 spin_lock(&cli->cl_cache->ccc_lru_lock);
2905 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2906 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2911 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2912 struct client_obd *cli = &obd->u.cli;
2913 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2914 long target = *(long *)val;
2916 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2921 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2924 /* We pass all other commands directly to OST. Since nobody calls osc
2925 methods directly and everybody is supposed to go through LOV, we
2926 assume lov checked invalid values for us.
2927 The only recognised values so far are evict_by_nid and mds_conn.
2928 Even if something bad goes through, we'd get a -EINVAL from OST
2931 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2932 &RQF_OST_SET_GRANT_INFO :
2937 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2938 RCL_CLIENT, keylen);
2939 if (!KEY_IS(KEY_GRANT_SHRINK))
2940 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2941 RCL_CLIENT, vallen);
2942 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2944 ptlrpc_request_free(req);
2948 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2949 memcpy(tmp, key, keylen);
2950 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2953 memcpy(tmp, val, vallen);
2955 if (KEY_IS(KEY_GRANT_SHRINK)) {
2956 struct osc_grant_args *aa;
2959 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2960 aa = ptlrpc_req_async_args(req);
2961 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2963 ptlrpc_req_finished(req);
2966 *oa = ((struct ost_body *)val)->oa;
2968 req->rq_interpret_reply = osc_shrink_grant_interpret;
2971 ptlrpc_request_set_replen(req);
2972 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2973 LASSERT(set != NULL);
2974 ptlrpc_set_add_req(set, req);
2975 ptlrpc_check_set(NULL, set);
2977 ptlrpcd_add_req(req);
2982 EXPORT_SYMBOL(osc_set_info_async);
2984 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2985 struct obd_device *obd, struct obd_uuid *cluuid,
2986 struct obd_connect_data *data, void *localdata)
2988 struct client_obd *cli = &obd->u.cli;
2990 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2994 spin_lock(&cli->cl_loi_list_lock);
2995 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2996 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2997 /* restore ocd_grant_blkbits as client page bits */
2998 data->ocd_grant_blkbits = PAGE_SHIFT;
2999 grant += cli->cl_dirty_grant;
3001 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3003 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3004 lost_grant = cli->cl_lost_grant;
3005 cli->cl_lost_grant = 0;
3006 spin_unlock(&cli->cl_loi_list_lock);
3008 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3009 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3010 data->ocd_version, data->ocd_grant, lost_grant);
3015 EXPORT_SYMBOL(osc_reconnect);
3017 int osc_disconnect(struct obd_export *exp)
3019 struct obd_device *obd = class_exp2obd(exp);
3022 rc = client_disconnect_export(exp);
3024 * Initially we put del_shrink_grant before disconnect_export, but it
3025 * causes the following problem if setup (connect) and cleanup
3026 * (disconnect) are tangled together.
3027 * connect p1 disconnect p2
3028 * ptlrpc_connect_import
3029 * ............... class_manual_cleanup
3032 * ptlrpc_connect_interrupt
3034 * add this client to shrink list
3036 * Bang! grant shrink thread trigger the shrink. BUG18662
3038 osc_del_grant_list(&obd->u.cli);
3041 EXPORT_SYMBOL(osc_disconnect);
3043 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3044 struct hlist_node *hnode, void *arg)
3046 struct lu_env *env = arg;
3047 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3048 struct ldlm_lock *lock;
3049 struct osc_object *osc = NULL;
3053 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3054 if (lock->l_ast_data != NULL && osc == NULL) {
3055 osc = lock->l_ast_data;
3056 cl_object_get(osc2cl(osc));
3059 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3060 * by the 2nd round of ldlm_namespace_clean() call in
3061 * osc_import_event(). */
3062 ldlm_clear_cleaned(lock);
3067 osc_object_invalidate(env, osc);
3068 cl_object_put(env, osc2cl(osc));
3073 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3075 static int osc_import_event(struct obd_device *obd,
3076 struct obd_import *imp,
3077 enum obd_import_event event)
3079 struct client_obd *cli;
3083 LASSERT(imp->imp_obd == obd);
3086 case IMP_EVENT_DISCON: {
3088 spin_lock(&cli->cl_loi_list_lock);
3089 cli->cl_avail_grant = 0;
3090 cli->cl_lost_grant = 0;
3091 spin_unlock(&cli->cl_loi_list_lock);
3094 case IMP_EVENT_INACTIVE: {
3095 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3098 case IMP_EVENT_INVALIDATE: {
3099 struct ldlm_namespace *ns = obd->obd_namespace;
3103 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3105 env = cl_env_get(&refcheck);
3107 osc_io_unplug(env, &obd->u.cli, NULL);
3109 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3110 osc_ldlm_resource_invalidate,
3112 cl_env_put(env, &refcheck);
3114 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3119 case IMP_EVENT_ACTIVE: {
3120 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3123 case IMP_EVENT_OCD: {
3124 struct obd_connect_data *ocd = &imp->imp_connect_data;
3126 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3127 osc_init_grant(&obd->u.cli, ocd);
3130 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3131 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3133 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3136 case IMP_EVENT_DEACTIVATE: {
3137 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3140 case IMP_EVENT_ACTIVATE: {
3141 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3145 CERROR("Unknown import event %d\n", event);
3152 * Determine whether the lock can be canceled before replaying the lock
3153 * during recovery, see bug16774 for detailed information.
3155 * \retval zero the lock can't be canceled
3156 * \retval other ok to cancel
3158 static int osc_cancel_weight(struct ldlm_lock *lock)
3161 * Cancel all unused and granted extent lock.
3163 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3164 ldlm_is_granted(lock) &&
3165 osc_ldlm_weigh_ast(lock) == 0)
3171 static int brw_queue_work(const struct lu_env *env, void *data)
3173 struct client_obd *cli = data;
3175 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3177 osc_io_unplug(env, cli, NULL);
3181 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3183 struct client_obd *cli = &obd->u.cli;
3189 rc = ptlrpcd_addref();
3193 rc = client_obd_setup(obd, lcfg);
3195 GOTO(out_ptlrpcd, rc);
3198 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3199 if (IS_ERR(handler))
3200 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3201 cli->cl_writeback_work = handler;
3203 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3204 if (IS_ERR(handler))
3205 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3206 cli->cl_lru_work = handler;
3208 rc = osc_quota_setup(obd);
3210 GOTO(out_ptlrpcd_work, rc);
3212 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3213 osc_update_next_shrink(cli);
3218 if (cli->cl_writeback_work != NULL) {
3219 ptlrpcd_destroy_work(cli->cl_writeback_work);
3220 cli->cl_writeback_work = NULL;
3222 if (cli->cl_lru_work != NULL) {
3223 ptlrpcd_destroy_work(cli->cl_lru_work);
3224 cli->cl_lru_work = NULL;
3226 client_obd_cleanup(obd);
3231 EXPORT_SYMBOL(osc_setup_common);
3233 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3235 struct client_obd *cli = &obd->u.cli;
3243 rc = osc_setup_common(obd, lcfg);
3247 rc = osc_tunables_init(obd);
3252 * We try to control the total number of requests with a upper limit
3253 * osc_reqpool_maxreqcount. There might be some race which will cause
3254 * over-limit allocation, but it is fine.
3256 req_count = atomic_read(&osc_pool_req_count);
3257 if (req_count < osc_reqpool_maxreqcount) {
3258 adding = cli->cl_max_rpcs_in_flight + 2;
3259 if (req_count + adding > osc_reqpool_maxreqcount)
3260 adding = osc_reqpool_maxreqcount - req_count;
3262 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3263 atomic_add(added, &osc_pool_req_count);
3266 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3268 spin_lock(&osc_shrink_lock);
3269 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3270 spin_unlock(&osc_shrink_lock);
3271 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3272 cli->cl_import->imp_idle_debug = D_HA;
3277 int osc_precleanup_common(struct obd_device *obd)
3279 struct client_obd *cli = &obd->u.cli;
3283 * for echo client, export may be on zombie list, wait for
3284 * zombie thread to cull it, because cli.cl_import will be
3285 * cleared in client_disconnect_export():
3286 * class_export_destroy() -> obd_cleanup() ->
3287 * echo_device_free() -> echo_client_cleanup() ->
3288 * obd_disconnect() -> osc_disconnect() ->
3289 * client_disconnect_export()
3291 obd_zombie_barrier();
3292 if (cli->cl_writeback_work) {
3293 ptlrpcd_destroy_work(cli->cl_writeback_work);
3294 cli->cl_writeback_work = NULL;
3297 if (cli->cl_lru_work) {
3298 ptlrpcd_destroy_work(cli->cl_lru_work);
3299 cli->cl_lru_work = NULL;
3302 obd_cleanup_client_import(obd);
3305 EXPORT_SYMBOL(osc_precleanup_common);
3307 static int osc_precleanup(struct obd_device *obd)
3311 osc_precleanup_common(obd);
3313 ptlrpc_lprocfs_unregister_obd(obd);
3317 int osc_cleanup_common(struct obd_device *obd)
3319 struct client_obd *cli = &obd->u.cli;
3324 spin_lock(&osc_shrink_lock);
3325 list_del(&cli->cl_shrink_list);
3326 spin_unlock(&osc_shrink_lock);
3329 if (cli->cl_cache != NULL) {
3330 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3331 spin_lock(&cli->cl_cache->ccc_lru_lock);
3332 list_del_init(&cli->cl_lru_osc);
3333 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3334 cli->cl_lru_left = NULL;
3335 cl_cache_decref(cli->cl_cache);
3336 cli->cl_cache = NULL;
3339 /* free memory of osc quota cache */
3340 osc_quota_cleanup(obd);
3342 rc = client_obd_cleanup(obd);
3347 EXPORT_SYMBOL(osc_cleanup_common);
3349 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3351 ssize_t count = class_modify_config(lcfg, PARAM_OSC,
3352 &obd->obd_kset.kobj);
3353 return count > 0 ? 0 : count;
3356 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3358 return osc_process_config_base(obd, buf);
3361 static struct obd_ops osc_obd_ops = {
3362 .o_owner = THIS_MODULE,
3363 .o_setup = osc_setup,
3364 .o_precleanup = osc_precleanup,
3365 .o_cleanup = osc_cleanup_common,
3366 .o_add_conn = client_import_add_conn,
3367 .o_del_conn = client_import_del_conn,
3368 .o_connect = client_connect_import,
3369 .o_reconnect = osc_reconnect,
3370 .o_disconnect = osc_disconnect,
3371 .o_statfs = osc_statfs,
3372 .o_statfs_async = osc_statfs_async,
3373 .o_create = osc_create,
3374 .o_destroy = osc_destroy,
3375 .o_getattr = osc_getattr,
3376 .o_setattr = osc_setattr,
3377 .o_iocontrol = osc_iocontrol,
3378 .o_set_info_async = osc_set_info_async,
3379 .o_import_event = osc_import_event,
3380 .o_process_config = osc_process_config,
3381 .o_quotactl = osc_quotactl,
3384 static struct shrinker *osc_cache_shrinker;
3385 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3386 DEFINE_SPINLOCK(osc_shrink_lock);
3388 #ifndef HAVE_SHRINKER_COUNT
3389 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3391 struct shrink_control scv = {
3392 .nr_to_scan = shrink_param(sc, nr_to_scan),
3393 .gfp_mask = shrink_param(sc, gfp_mask)
3395 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3396 struct shrinker *shrinker = NULL;
3399 (void)osc_cache_shrink_scan(shrinker, &scv);
3401 return osc_cache_shrink_count(shrinker, &scv);
3405 static int __init osc_init(void)
3407 bool enable_proc = true;
3408 struct obd_type *type;
3409 unsigned int reqpool_size;
3410 unsigned int reqsize;
3412 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3413 osc_cache_shrink_count, osc_cache_shrink_scan);
3416 /* print an address of _any_ initialized kernel symbol from this
3417 * module, to allow debugging with gdb that doesn't support data
3418 * symbols from modules.*/
3419 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3421 rc = lu_kmem_init(osc_caches);
3425 type = class_search_type(LUSTRE_OSP_NAME);
3426 if (type != NULL && type->typ_procsym != NULL)
3427 enable_proc = false;
3429 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3430 LUSTRE_OSC_NAME, &osc_device_type);
3434 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3436 /* This is obviously too much memory, only prevent overflow here */
3437 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3438 GOTO(out_type, rc = -EINVAL);
3440 reqpool_size = osc_reqpool_mem_max << 20;
3443 while (reqsize < OST_IO_MAXREQSIZE)
3444 reqsize = reqsize << 1;
3447 * We don't enlarge the request count in OSC pool according to
3448 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3449 * tried after normal allocation failed. So a small OSC pool won't
3450 * cause much performance degression in most of cases.
3452 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3454 atomic_set(&osc_pool_req_count, 0);
3455 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3456 ptlrpc_add_rqs_to_pool);
3458 if (osc_rq_pool == NULL)
3459 GOTO(out_type, rc = -ENOMEM);
3461 rc = osc_start_grant_work();
3463 GOTO(out_req_pool, rc);
3468 ptlrpc_free_rq_pool(osc_rq_pool);
3470 class_unregister_type(LUSTRE_OSC_NAME);
3472 lu_kmem_fini(osc_caches);
3477 static void __exit osc_exit(void)
3479 osc_stop_grant_work();
3480 remove_shrinker(osc_cache_shrinker);
3481 class_unregister_type(LUSTRE_OSC_NAME);
3482 lu_kmem_fini(osc_caches);
3483 ptlrpc_free_rq_pool(osc_rq_pool);
3486 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3487 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3488 MODULE_VERSION(LUSTRE_VERSION_STRING);
3489 MODULE_LICENSE("GPL");
3491 module_init(osc_init);
3492 module_exit(osc_exit);