4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
44 #include <uapi/linux/lustre/lustre_param.h>
46 #include <obd_cksum.h>
47 #include <obd_class.h>
48 #include <lustre_osc.h>
50 #include "osc_internal.h"
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
63 #define osc_grant_args osc_brw_async_args
65 struct osc_setattr_args {
67 obd_enqueue_update_f sa_upcall;
71 struct osc_fsync_args {
72 struct osc_object *fa_obj;
74 obd_enqueue_update_f fa_upcall;
78 struct osc_ladvise_args {
80 obd_enqueue_update_f la_upcall;
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 struct ost_body *body;
92 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101 struct ptlrpc_request *req;
102 struct ost_body *body;
106 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
110 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112 ptlrpc_request_free(req);
116 osc_pack_req_body(req, oa);
118 ptlrpc_request_set_replen(req);
120 rc = ptlrpc_queue_wait(req);
124 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126 GOTO(out, rc = -EPROTO);
128 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131 oa->o_blksize = cli_brw_size(exp->exp_obd);
132 oa->o_valid |= OBD_MD_FLBLKSZ;
136 ptlrpc_req_finished(req);
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144 struct ptlrpc_request *req;
145 struct ost_body *body;
149 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
155 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157 ptlrpc_request_free(req);
161 osc_pack_req_body(req, oa);
163 ptlrpc_request_set_replen(req);
165 rc = ptlrpc_queue_wait(req);
169 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171 GOTO(out, rc = -EPROTO);
173 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
177 ptlrpc_req_finished(req);
182 static int osc_setattr_interpret(const struct lu_env *env,
183 struct ptlrpc_request *req,
184 struct osc_setattr_args *sa, int rc)
186 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply =
233 (ptlrpc_interpterer_t)osc_setattr_interpret;
235 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
236 sa = ptlrpc_req_async_args(req);
238 sa->sa_upcall = upcall;
239 sa->sa_cookie = cookie;
241 if (rqset == PTLRPCD_SET)
242 ptlrpcd_add_req(req);
244 ptlrpc_set_add_req(rqset, req);
250 static int osc_ladvise_interpret(const struct lu_env *env,
251 struct ptlrpc_request *req,
254 struct osc_ladvise_args *la = arg;
255 struct ost_body *body;
261 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
263 GOTO(out, rc = -EPROTO);
265 *la->la_oa = body->oa;
267 rc = la->la_upcall(la->la_cookie, rc);
272 * If rqset is NULL, do not wait for response. Upcall and cookie could also
273 * be NULL in this case
275 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
276 struct ladvise_hdr *ladvise_hdr,
277 obd_enqueue_update_f upcall, void *cookie,
278 struct ptlrpc_request_set *rqset)
280 struct ptlrpc_request *req;
281 struct ost_body *body;
282 struct osc_ladvise_args *la;
284 struct lu_ladvise *req_ladvise;
285 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
286 int num_advise = ladvise_hdr->lah_count;
287 struct ladvise_hdr *req_ladvise_hdr;
290 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
294 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
295 num_advise * sizeof(*ladvise));
296 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
298 ptlrpc_request_free(req);
301 req->rq_request_portal = OST_IO_PORTAL;
302 ptlrpc_at_set_req_timeout(req);
304 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
306 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
309 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
310 &RMF_OST_LADVISE_HDR);
311 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
313 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
314 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
315 ptlrpc_request_set_replen(req);
318 /* Do not wait for response. */
319 ptlrpcd_add_req(req);
323 req->rq_interpret_reply = osc_ladvise_interpret;
324 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
325 la = ptlrpc_req_async_args(req);
327 la->la_upcall = upcall;
328 la->la_cookie = cookie;
330 if (rqset == PTLRPCD_SET)
331 ptlrpcd_add_req(req);
333 ptlrpc_set_add_req(rqset, req);
338 static int osc_create(const struct lu_env *env, struct obd_export *exp,
341 struct ptlrpc_request *req;
342 struct ost_body *body;
347 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
348 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
350 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
352 GOTO(out, rc = -ENOMEM);
354 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
356 ptlrpc_request_free(req);
360 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
363 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
365 ptlrpc_request_set_replen(req);
367 rc = ptlrpc_queue_wait(req);
371 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
373 GOTO(out_req, rc = -EPROTO);
375 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
376 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
378 oa->o_blksize = cli_brw_size(exp->exp_obd);
379 oa->o_valid |= OBD_MD_FLBLKSZ;
381 CDEBUG(D_HA, "transno: %lld\n",
382 lustre_msg_get_transno(req->rq_repmsg));
384 ptlrpc_req_finished(req);
389 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
390 obd_enqueue_update_f upcall, void *cookie)
392 struct ptlrpc_request *req;
393 struct osc_setattr_args *sa;
394 struct obd_import *imp = class_exp2cliimp(exp);
395 struct ost_body *body;
400 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
404 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
406 ptlrpc_request_free(req);
410 osc_set_io_portal(req);
412 ptlrpc_at_set_req_timeout(req);
414 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
416 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
418 ptlrpc_request_set_replen(req);
420 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
422 sa = ptlrpc_req_async_args(req);
424 sa->sa_upcall = upcall;
425 sa->sa_cookie = cookie;
427 ptlrpcd_add_req(req);
431 EXPORT_SYMBOL(osc_punch_send);
433 static int osc_sync_interpret(const struct lu_env *env,
434 struct ptlrpc_request *req,
437 struct osc_fsync_args *fa = arg;
438 struct ost_body *body;
439 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
440 unsigned long valid = 0;
441 struct cl_object *obj;
447 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
449 CERROR("can't unpack ost_body\n");
450 GOTO(out, rc = -EPROTO);
453 *fa->fa_oa = body->oa;
454 obj = osc2cl(fa->fa_obj);
456 /* Update osc object's blocks attribute */
457 cl_object_attr_lock(obj);
458 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
459 attr->cat_blocks = body->oa.o_blocks;
464 cl_object_attr_update(env, obj, attr, valid);
465 cl_object_attr_unlock(obj);
468 rc = fa->fa_upcall(fa->fa_cookie, rc);
472 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
473 obd_enqueue_update_f upcall, void *cookie,
474 struct ptlrpc_request_set *rqset)
476 struct obd_export *exp = osc_export(obj);
477 struct ptlrpc_request *req;
478 struct ost_body *body;
479 struct osc_fsync_args *fa;
483 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
487 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
489 ptlrpc_request_free(req);
493 /* overload the size and blocks fields in the oa with start/end */
494 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
496 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
498 ptlrpc_request_set_replen(req);
499 req->rq_interpret_reply = osc_sync_interpret;
501 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
502 fa = ptlrpc_req_async_args(req);
505 fa->fa_upcall = upcall;
506 fa->fa_cookie = cookie;
508 if (rqset == PTLRPCD_SET)
509 ptlrpcd_add_req(req);
511 ptlrpc_set_add_req(rqset, req);
516 /* Find and cancel locally locks matched by @mode in the resource found by
517 * @objid. Found locks are added into @cancel list. Returns the amount of
518 * locks added to @cancels list. */
519 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
520 struct list_head *cancels,
521 enum ldlm_mode mode, __u64 lock_flags)
523 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
524 struct ldlm_res_id res_id;
525 struct ldlm_resource *res;
529 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
530 * export) but disabled through procfs (flag in NS).
532 * This distinguishes from a case when ELC is not supported originally,
533 * when we still want to cancel locks in advance and just cancel them
534 * locally, without sending any RPC. */
535 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
538 ostid_build_res_name(&oa->o_oi, &res_id);
539 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
543 LDLM_RESOURCE_ADDREF(res);
544 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
545 lock_flags, 0, NULL);
546 LDLM_RESOURCE_DELREF(res);
547 ldlm_resource_putref(res);
551 static int osc_destroy_interpret(const struct lu_env *env,
552 struct ptlrpc_request *req, void *data,
555 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
557 atomic_dec(&cli->cl_destroy_in_flight);
558 wake_up(&cli->cl_destroy_waitq);
562 static int osc_can_send_destroy(struct client_obd *cli)
564 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
565 cli->cl_max_rpcs_in_flight) {
566 /* The destroy request can be sent */
569 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
570 cli->cl_max_rpcs_in_flight) {
572 * The counter has been modified between the two atomic
575 wake_up(&cli->cl_destroy_waitq);
580 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
583 struct client_obd *cli = &exp->exp_obd->u.cli;
584 struct ptlrpc_request *req;
585 struct ost_body *body;
586 struct list_head cancels = LIST_HEAD_INIT(cancels);
591 CDEBUG(D_INFO, "oa NULL\n");
595 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
596 LDLM_FL_DISCARD_DATA);
598 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
600 ldlm_lock_list_put(&cancels, l_bl_ast, count);
604 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
607 ptlrpc_request_free(req);
611 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
612 ptlrpc_at_set_req_timeout(req);
614 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
616 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
618 ptlrpc_request_set_replen(req);
620 req->rq_interpret_reply = osc_destroy_interpret;
621 if (!osc_can_send_destroy(cli)) {
622 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
625 * Wait until the number of on-going destroy RPCs drops
626 * under max_rpc_in_flight
628 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
629 osc_can_send_destroy(cli), &lwi);
631 ptlrpc_req_finished(req);
636 /* Do not wait for response */
637 ptlrpcd_add_req(req);
641 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
644 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
646 LASSERT(!(oa->o_valid & bits));
649 spin_lock(&cli->cl_loi_list_lock);
650 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
651 oa->o_dirty = cli->cl_dirty_grant;
653 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
654 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
655 cli->cl_dirty_max_pages)) {
656 CERROR("dirty %lu - %lu > dirty_max %lu\n",
657 cli->cl_dirty_pages, cli->cl_dirty_transit,
658 cli->cl_dirty_max_pages);
660 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
661 atomic_long_read(&obd_dirty_transit_pages) >
662 (long)(obd_max_dirty_pages + 1))) {
663 /* The atomic_read() allowing the atomic_inc() are
664 * not covered by a lock thus they may safely race and trip
665 * this CERROR() unless we add in a small fudge factor (+1). */
666 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
667 cli_name(cli), atomic_long_read(&obd_dirty_pages),
668 atomic_long_read(&obd_dirty_transit_pages),
669 obd_max_dirty_pages);
671 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
673 CERROR("dirty %lu - dirty_max %lu too big???\n",
674 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
677 unsigned long nrpages;
678 unsigned long undirty;
680 nrpages = cli->cl_max_pages_per_rpc;
681 nrpages *= cli->cl_max_rpcs_in_flight + 1;
682 nrpages = max(nrpages, cli->cl_dirty_max_pages);
683 undirty = nrpages << PAGE_SHIFT;
684 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
688 /* take extent tax into account when asking for more
690 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
691 cli->cl_max_extent_pages;
692 undirty += nrextents * cli->cl_grant_extent_tax;
694 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
695 * to add extent tax, etc.
697 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
698 (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
700 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
701 oa->o_dropped = cli->cl_lost_grant;
702 cli->cl_lost_grant = 0;
703 spin_unlock(&cli->cl_loi_list_lock);
704 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
705 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
708 void osc_update_next_shrink(struct client_obd *cli)
710 cli->cl_next_shrink_grant = ktime_get_seconds() +
711 cli->cl_grant_shrink_interval;
713 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
714 cli->cl_next_shrink_grant);
717 static void __osc_update_grant(struct client_obd *cli, u64 grant)
719 spin_lock(&cli->cl_loi_list_lock);
720 cli->cl_avail_grant += grant;
721 spin_unlock(&cli->cl_loi_list_lock);
724 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
726 if (body->oa.o_valid & OBD_MD_FLGRANT) {
727 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
728 __osc_update_grant(cli, body->oa.o_grant);
733 * grant thread data for shrinking space.
735 struct grant_thread_data {
736 struct list_head gtd_clients;
737 struct mutex gtd_mutex;
738 unsigned long gtd_stopped:1;
740 static struct grant_thread_data client_gtd;
742 static int osc_shrink_grant_interpret(const struct lu_env *env,
743 struct ptlrpc_request *req,
746 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
747 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
748 struct ost_body *body;
751 __osc_update_grant(cli, oa->o_grant);
755 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
757 osc_update_grant(cli, body);
763 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
765 spin_lock(&cli->cl_loi_list_lock);
766 oa->o_grant = cli->cl_avail_grant / 4;
767 cli->cl_avail_grant -= oa->o_grant;
768 spin_unlock(&cli->cl_loi_list_lock);
769 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
770 oa->o_valid |= OBD_MD_FLFLAGS;
773 oa->o_flags |= OBD_FL_SHRINK_GRANT;
774 osc_update_next_shrink(cli);
777 /* Shrink the current grant, either from some large amount to enough for a
778 * full set of in-flight RPCs, or if we have already shrunk to that limit
779 * then to enough for a single RPC. This avoids keeping more grant than
780 * needed, and avoids shrinking the grant piecemeal. */
781 static int osc_shrink_grant(struct client_obd *cli)
783 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
784 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
786 spin_lock(&cli->cl_loi_list_lock);
787 if (cli->cl_avail_grant <= target_bytes)
788 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
789 spin_unlock(&cli->cl_loi_list_lock);
791 return osc_shrink_grant_to_target(cli, target_bytes);
794 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
797 struct ost_body *body;
800 spin_lock(&cli->cl_loi_list_lock);
801 /* Don't shrink if we are already above or below the desired limit
802 * We don't want to shrink below a single RPC, as that will negatively
803 * impact block allocation and long-term performance. */
804 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
805 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
807 if (target_bytes >= cli->cl_avail_grant) {
808 spin_unlock(&cli->cl_loi_list_lock);
811 spin_unlock(&cli->cl_loi_list_lock);
817 osc_announce_cached(cli, &body->oa, 0);
819 spin_lock(&cli->cl_loi_list_lock);
820 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
821 cli->cl_avail_grant = target_bytes;
822 spin_unlock(&cli->cl_loi_list_lock);
823 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
824 body->oa.o_valid |= OBD_MD_FLFLAGS;
825 body->oa.o_flags = 0;
827 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
828 osc_update_next_shrink(cli);
830 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
831 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
832 sizeof(*body), body, NULL);
834 __osc_update_grant(cli, body->oa.o_grant);
839 static int osc_should_shrink_grant(struct client_obd *client)
841 time64_t next_shrink = client->cl_next_shrink_grant;
843 if (client->cl_import == NULL)
846 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
847 OBD_CONNECT_GRANT_SHRINK) == 0)
850 if (ktime_get_seconds() >= next_shrink - 5) {
851 /* Get the current RPC size directly, instead of going via:
852 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
853 * Keep comment here so that it can be found by searching. */
854 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
856 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
857 client->cl_avail_grant > brw_size)
860 osc_update_next_shrink(client);
865 #define GRANT_SHRINK_RPC_BATCH 100
867 static struct delayed_work work;
869 static void osc_grant_work_handler(struct work_struct *data)
871 struct client_obd *cli;
873 bool init_next_shrink = true;
874 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
877 mutex_lock(&client_gtd.gtd_mutex);
878 list_for_each_entry(cli, &client_gtd.gtd_clients,
880 if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
881 osc_should_shrink_grant(cli))
882 osc_shrink_grant(cli);
884 if (!init_next_shrink) {
885 if (cli->cl_next_shrink_grant < next_shrink &&
886 cli->cl_next_shrink_grant > ktime_get_seconds())
887 next_shrink = cli->cl_next_shrink_grant;
889 init_next_shrink = false;
890 next_shrink = cli->cl_next_shrink_grant;
893 mutex_unlock(&client_gtd.gtd_mutex);
895 if (client_gtd.gtd_stopped == 1)
898 if (next_shrink > ktime_get_seconds())
899 schedule_delayed_work(&work, msecs_to_jiffies(
900 (next_shrink - ktime_get_seconds()) *
903 schedule_work(&work.work);
907 * Start grant thread for returing grant to server for idle clients.
909 static int osc_start_grant_work(void)
911 client_gtd.gtd_stopped = 0;
912 mutex_init(&client_gtd.gtd_mutex);
913 INIT_LIST_HEAD(&client_gtd.gtd_clients);
915 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
916 schedule_work(&work.work);
921 static void osc_stop_grant_work(void)
923 client_gtd.gtd_stopped = 1;
924 cancel_delayed_work_sync(&work);
927 static void osc_add_grant_list(struct client_obd *client)
929 mutex_lock(&client_gtd.gtd_mutex);
930 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
931 mutex_unlock(&client_gtd.gtd_mutex);
934 static void osc_del_grant_list(struct client_obd *client)
936 if (list_empty(&client->cl_grant_chain))
939 mutex_lock(&client_gtd.gtd_mutex);
940 list_del_init(&client->cl_grant_chain);
941 mutex_unlock(&client_gtd.gtd_mutex);
944 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
947 * ocd_grant is the total grant amount we're expect to hold: if we've
948 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
949 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
952 * race is tolerable here: if we're evicted, but imp_state already
953 * left EVICTED state, then cl_dirty_pages must be 0 already.
955 spin_lock(&cli->cl_loi_list_lock);
956 cli->cl_avail_grant = ocd->ocd_grant;
957 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
958 cli->cl_avail_grant -= cli->cl_reserved_grant;
959 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
960 cli->cl_avail_grant -= cli->cl_dirty_grant;
962 cli->cl_avail_grant -=
963 cli->cl_dirty_pages << PAGE_SHIFT;
966 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
970 /* overhead for each extent insertion */
971 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
972 /* determine the appropriate chunk size used by osc_extent. */
973 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
974 ocd->ocd_grant_blkbits);
975 /* max_pages_per_rpc must be chunk aligned */
976 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
977 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
978 ~chunk_mask) & chunk_mask;
979 /* determine maximum extent size, in #pages */
980 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
981 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
982 if (cli->cl_max_extent_pages == 0)
983 cli->cl_max_extent_pages = 1;
985 cli->cl_grant_extent_tax = 0;
986 cli->cl_chunkbits = PAGE_SHIFT;
987 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
989 spin_unlock(&cli->cl_loi_list_lock);
991 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
992 "chunk bits: %d cl_max_extent_pages: %d\n",
994 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
995 cli->cl_max_extent_pages);
997 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
998 osc_add_grant_list(cli);
1000 EXPORT_SYMBOL(osc_init_grant);
1002 /* We assume that the reason this OSC got a short read is because it read
1003 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1004 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1005 * this stripe never got written at or beyond this stripe offset yet. */
1006 static void handle_short_read(int nob_read, size_t page_count,
1007 struct brw_page **pga)
1012 /* skip bytes read OK */
1013 while (nob_read > 0) {
1014 LASSERT (page_count > 0);
1016 if (pga[i]->count > nob_read) {
1017 /* EOF inside this page */
1018 ptr = kmap(pga[i]->pg) +
1019 (pga[i]->off & ~PAGE_MASK);
1020 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1027 nob_read -= pga[i]->count;
1032 /* zero remaining pages */
1033 while (page_count-- > 0) {
1034 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1035 memset(ptr, 0, pga[i]->count);
1041 static int check_write_rcs(struct ptlrpc_request *req,
1042 int requested_nob, int niocount,
1043 size_t page_count, struct brw_page **pga)
1048 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1049 sizeof(*remote_rcs) *
1051 if (remote_rcs == NULL) {
1052 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1056 /* return error if any niobuf was in error */
1057 for (i = 0; i < niocount; i++) {
1058 if ((int)remote_rcs[i] < 0)
1059 return(remote_rcs[i]);
1061 if (remote_rcs[i] != 0) {
1062 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1063 i, remote_rcs[i], req);
1067 if (req->rq_bulk != NULL &&
1068 req->rq_bulk->bd_nob_transferred != requested_nob) {
1069 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1070 req->rq_bulk->bd_nob_transferred, requested_nob);
1077 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1079 if (p1->flag != p2->flag) {
1080 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1081 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1082 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1084 /* warn if we try to combine flags that we don't know to be
1085 * safe to combine */
1086 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1087 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1088 "report this at https://jira.hpdd.intel.com/\n",
1089 p1->flag, p2->flag);
1094 return (p1->off + p1->count == p2->off);
1097 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1098 size_t pg_count, struct brw_page **pga,
1099 int opc, obd_dif_csum_fn *fn,
1103 struct cfs_crypto_hash_desc *hdesc;
1104 /* Used Adler as the default checksum type on top of DIF tags */
1105 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1106 struct page *__page;
1107 unsigned char *buffer;
1109 unsigned int bufsize;
1111 int used_number = 0;
1117 LASSERT(pg_count > 0);
1119 __page = alloc_page(GFP_KERNEL);
1123 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1124 if (IS_ERR(hdesc)) {
1125 rc = PTR_ERR(hdesc);
1126 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1127 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1131 buffer = kmap(__page);
1132 guard_start = (__u16 *)buffer;
1133 guard_number = PAGE_SIZE / sizeof(*guard_start);
1134 while (nob > 0 && pg_count > 0) {
1135 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1137 /* corrupt the data before we compute the checksum, to
1138 * simulate an OST->client data error */
1139 if (unlikely(i == 0 && opc == OST_READ &&
1140 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1141 unsigned char *ptr = kmap(pga[i]->pg);
1142 int off = pga[i]->off & ~PAGE_MASK;
1144 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1149 * The left guard number should be able to hold checksums of a
1152 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, 0,
1154 guard_start + used_number,
1155 guard_number - used_number,
1161 used_number += used;
1162 if (used_number == guard_number) {
1163 cfs_crypto_hash_update_page(hdesc, __page, 0,
1164 used_number * sizeof(*guard_start));
1168 nob -= pga[i]->count;
1176 if (used_number != 0)
1177 cfs_crypto_hash_update_page(hdesc, __page, 0,
1178 used_number * sizeof(*guard_start));
1180 bufsize = sizeof(cksum);
1181 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1183 /* For sending we only compute the wrong checksum instead
1184 * of corrupting the data so it is still correct on a redo */
1185 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1190 __free_page(__page);
1194 static int osc_checksum_bulk(int nob, size_t pg_count,
1195 struct brw_page **pga, int opc,
1196 enum cksum_types cksum_type,
1200 struct cfs_crypto_hash_desc *hdesc;
1201 unsigned int bufsize;
1202 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1204 LASSERT(pg_count > 0);
1206 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1207 if (IS_ERR(hdesc)) {
1208 CERROR("Unable to initialize checksum hash %s\n",
1209 cfs_crypto_hash_name(cfs_alg));
1210 return PTR_ERR(hdesc);
1213 while (nob > 0 && pg_count > 0) {
1214 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1216 /* corrupt the data before we compute the checksum, to
1217 * simulate an OST->client data error */
1218 if (i == 0 && opc == OST_READ &&
1219 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1220 unsigned char *ptr = kmap(pga[i]->pg);
1221 int off = pga[i]->off & ~PAGE_MASK;
1223 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1226 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1227 pga[i]->off & ~PAGE_MASK,
1229 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1230 (int)(pga[i]->off & ~PAGE_MASK));
1232 nob -= pga[i]->count;
1237 bufsize = sizeof(*cksum);
1238 cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize);
1240 /* For sending we only compute the wrong checksum instead
1241 * of corrupting the data so it is still correct on a redo */
1242 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1248 static int osc_checksum_bulk_rw(const char *obd_name,
1249 enum cksum_types cksum_type,
1250 int nob, size_t pg_count,
1251 struct brw_page **pga, int opc,
1254 obd_dif_csum_fn *fn = NULL;
1255 int sector_size = 0;
1259 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1262 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1263 opc, fn, sector_size, check_sum);
1265 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1272 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1273 u32 page_count, struct brw_page **pga,
1274 struct ptlrpc_request **reqp, int resend)
1276 struct ptlrpc_request *req;
1277 struct ptlrpc_bulk_desc *desc;
1278 struct ost_body *body;
1279 struct obd_ioobj *ioobj;
1280 struct niobuf_remote *niobuf;
1281 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1282 struct osc_brw_async_args *aa;
1283 struct req_capsule *pill;
1284 struct brw_page *pg_prev;
1286 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1289 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1290 RETURN(-ENOMEM); /* Recoverable */
1291 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1292 RETURN(-EINVAL); /* Fatal */
1294 if ((cmd & OBD_BRW_WRITE) != 0) {
1296 req = ptlrpc_request_alloc_pool(cli->cl_import,
1298 &RQF_OST_BRW_WRITE);
1301 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1306 for (niocount = i = 1; i < page_count; i++) {
1307 if (!can_merge_pages(pga[i - 1], pga[i]))
1311 pill = &req->rq_pill;
1312 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1314 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1315 niocount * sizeof(*niobuf));
1317 for (i = 0; i < page_count; i++)
1318 short_io_size += pga[i]->count;
1320 /* Check if we can do a short io. */
1321 if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1322 imp_connect_shortio(cli->cl_import)))
1325 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1326 opc == OST_READ ? 0 : short_io_size);
1327 if (opc == OST_READ)
1328 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1331 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1333 ptlrpc_request_free(req);
1336 osc_set_io_portal(req);
1338 ptlrpc_at_set_req_timeout(req);
1339 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1341 req->rq_no_retry_einprogress = 1;
1343 if (short_io_size != 0) {
1345 short_io_buf = NULL;
1349 desc = ptlrpc_prep_bulk_imp(req, page_count,
1350 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1351 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1352 PTLRPC_BULK_PUT_SINK) |
1353 PTLRPC_BULK_BUF_KIOV,
1355 &ptlrpc_bulk_kiov_pin_ops);
1358 GOTO(out, rc = -ENOMEM);
1359 /* NB request now owns desc and will free it when it gets freed */
1361 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1362 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1363 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1364 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1366 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1368 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1369 * and from_kgid(), because they are asynchronous. Fortunately, variable
1370 * oa contains valid o_uid and o_gid in these two operations.
1371 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1372 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1373 * other process logic */
1374 body->oa.o_uid = oa->o_uid;
1375 body->oa.o_gid = oa->o_gid;
1377 obdo_to_ioobj(oa, ioobj);
1378 ioobj->ioo_bufcnt = niocount;
1379 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1380 * that might be send for this request. The actual number is decided
1381 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1382 * "max - 1" for old client compatibility sending "0", and also so the
1383 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1385 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1387 ioobj_max_brw_set(ioobj, 0);
1389 if (short_io_size != 0) {
1390 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1391 body->oa.o_valid |= OBD_MD_FLFLAGS;
1392 body->oa.o_flags = 0;
1394 body->oa.o_flags |= OBD_FL_SHORT_IO;
1395 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1397 if (opc == OST_WRITE) {
1398 short_io_buf = req_capsule_client_get(pill,
1400 LASSERT(short_io_buf != NULL);
1404 LASSERT(page_count > 0);
1406 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1407 struct brw_page *pg = pga[i];
1408 int poff = pg->off & ~PAGE_MASK;
1410 LASSERT(pg->count > 0);
1411 /* make sure there is no gap in the middle of page array */
1412 LASSERTF(page_count == 1 ||
1413 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1414 ergo(i > 0 && i < page_count - 1,
1415 poff == 0 && pg->count == PAGE_SIZE) &&
1416 ergo(i == page_count - 1, poff == 0)),
1417 "i: %d/%d pg: %p off: %llu, count: %u\n",
1418 i, page_count, pg, pg->off, pg->count);
1419 LASSERTF(i == 0 || pg->off > pg_prev->off,
1420 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1421 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1423 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1424 pg_prev->pg, page_private(pg_prev->pg),
1425 pg_prev->pg->index, pg_prev->off);
1426 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1427 (pg->flag & OBD_BRW_SRVLOCK));
1428 if (short_io_size != 0 && opc == OST_WRITE) {
1429 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1431 LASSERT(short_io_size >= requested_nob + pg->count);
1432 memcpy(short_io_buf + requested_nob,
1435 ll_kunmap_atomic(ptr, KM_USER0);
1436 } else if (short_io_size == 0) {
1437 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1440 requested_nob += pg->count;
1442 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1444 niobuf->rnb_len += pg->count;
1446 niobuf->rnb_offset = pg->off;
1447 niobuf->rnb_len = pg->count;
1448 niobuf->rnb_flags = pg->flag;
1453 LASSERTF((void *)(niobuf - niocount) ==
1454 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1455 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1456 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1458 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1460 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1461 body->oa.o_valid |= OBD_MD_FLFLAGS;
1462 body->oa.o_flags = 0;
1464 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1467 if (osc_should_shrink_grant(cli))
1468 osc_shrink_grant_local(cli, &body->oa);
1470 /* size[REQ_REC_OFF] still sizeof (*body) */
1471 if (opc == OST_WRITE) {
1472 if (cli->cl_checksum &&
1473 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1474 /* store cl_cksum_type in a local variable since
1475 * it can be changed via lprocfs */
1476 enum cksum_types cksum_type = cli->cl_cksum_type;
1478 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1479 body->oa.o_flags = 0;
1481 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1483 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1485 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1486 requested_nob, page_count,
1490 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1494 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1497 /* save this in 'oa', too, for later checking */
1498 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1499 oa->o_flags |= obd_cksum_type_pack(obd_name,
1502 /* clear out the checksum flag, in case this is a
1503 * resend but cl_checksum is no longer set. b=11238 */
1504 oa->o_valid &= ~OBD_MD_FLCKSUM;
1506 oa->o_cksum = body->oa.o_cksum;
1507 /* 1 RC per niobuf */
1508 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1509 sizeof(__u32) * niocount);
1511 if (cli->cl_checksum &&
1512 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1513 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1514 body->oa.o_flags = 0;
1515 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1516 cli->cl_cksum_type);
1517 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1520 /* Client cksum has been already copied to wire obdo in previous
1521 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1522 * resent due to cksum error, this will allow Server to
1523 * check+dump pages on its side */
1525 ptlrpc_request_set_replen(req);
1527 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1528 aa = ptlrpc_req_async_args(req);
1530 aa->aa_requested_nob = requested_nob;
1531 aa->aa_nio_count = niocount;
1532 aa->aa_page_count = page_count;
1536 INIT_LIST_HEAD(&aa->aa_oaps);
1539 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1540 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1541 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1542 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1546 ptlrpc_req_finished(req);
1550 char dbgcksum_file_name[PATH_MAX];
1552 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1553 struct brw_page **pga, __u32 server_cksum,
1561 /* will only keep dump of pages on first error for the same range in
1562 * file/fid, not during the resends/retries. */
1563 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1564 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1565 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1566 libcfs_debug_file_path_arr :
1567 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1568 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1569 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1570 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1572 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1573 client_cksum, server_cksum);
1574 filp = filp_open(dbgcksum_file_name,
1575 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1579 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1580 "checksum error: rc = %d\n", dbgcksum_file_name,
1583 CERROR("%s: can't open to dump pages with checksum "
1584 "error: rc = %d\n", dbgcksum_file_name, rc);
1588 for (i = 0; i < page_count; i++) {
1589 len = pga[i]->count;
1590 buf = kmap(pga[i]->pg);
1592 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1594 CERROR("%s: wanted to write %u but got %d "
1595 "error\n", dbgcksum_file_name, len, rc);
1600 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1601 dbgcksum_file_name, rc);
1606 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1608 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1609 filp_close(filp, NULL);
1614 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1615 __u32 client_cksum, __u32 server_cksum,
1616 struct osc_brw_async_args *aa)
1618 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1619 enum cksum_types cksum_type;
1620 obd_dif_csum_fn *fn = NULL;
1621 int sector_size = 0;
1627 if (server_cksum == client_cksum) {
1628 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1632 if (aa->aa_cli->cl_checksum_dump)
1633 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1634 server_cksum, client_cksum);
1636 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1639 switch (cksum_type) {
1640 case OBD_CKSUM_T10IP512:
1645 case OBD_CKSUM_T10IP4K:
1650 case OBD_CKSUM_T10CRC512:
1652 fn = obd_dif_crc_fn;
1655 case OBD_CKSUM_T10CRC4K:
1657 fn = obd_dif_crc_fn;
1665 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1673 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1674 aa->aa_ppga, OST_WRITE, cksum_type,
1678 msg = "failed to calculate the client write checksum";
1679 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1680 msg = "the server did not use the checksum type specified in "
1681 "the original request - likely a protocol problem";
1682 else if (new_cksum == server_cksum)
1683 msg = "changed on the client after we checksummed it - "
1684 "likely false positive due to mmap IO (bug 11742)";
1685 else if (new_cksum == client_cksum)
1686 msg = "changed in transit before arrival at OST";
1688 msg = "changed in transit AND doesn't match the original - "
1689 "likely false positive due to mmap IO (bug 11742)";
1691 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1692 DFID " object "DOSTID" extent [%llu-%llu], original "
1693 "client csum %x (type %x), server csum %x (type %x),"
1694 " client csum now %x\n",
1695 obd_name, msg, libcfs_nid2str(peer->nid),
1696 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1697 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1698 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1699 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1700 aa->aa_ppga[aa->aa_page_count - 1]->off +
1701 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1703 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1704 server_cksum, cksum_type, new_cksum);
1708 /* Note rc enters this function as number of bytes transferred */
1709 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1711 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1712 struct client_obd *cli = aa->aa_cli;
1713 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1714 const struct lnet_process_id *peer =
1715 &req->rq_import->imp_connection->c_peer;
1716 struct ost_body *body;
1717 u32 client_cksum = 0;
1720 if (rc < 0 && rc != -EDQUOT) {
1721 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1725 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1726 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1728 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1732 /* set/clear over quota flag for a uid/gid/projid */
1733 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1734 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1735 unsigned qid[LL_MAXQUOTAS] = {
1736 body->oa.o_uid, body->oa.o_gid,
1737 body->oa.o_projid };
1738 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1739 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1740 body->oa.o_valid, body->oa.o_flags);
1741 osc_quota_setdq(cli, qid, body->oa.o_valid,
1745 osc_update_grant(cli, body);
1750 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1751 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1753 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1755 CERROR("Unexpected +ve rc %d\n", rc);
1759 if (req->rq_bulk != NULL &&
1760 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1763 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1764 check_write_checksum(&body->oa, peer, client_cksum,
1765 body->oa.o_cksum, aa))
1768 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1769 aa->aa_page_count, aa->aa_ppga);
1773 /* The rest of this function executes only for OST_READs */
1775 if (req->rq_bulk == NULL) {
1776 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1778 LASSERT(rc == req->rq_status);
1780 /* if unwrap_bulk failed, return -EAGAIN to retry */
1781 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1784 GOTO(out, rc = -EAGAIN);
1786 if (rc > aa->aa_requested_nob) {
1787 CERROR("Unexpected rc %d (%d requested)\n", rc,
1788 aa->aa_requested_nob);
1792 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1793 CERROR ("Unexpected rc %d (%d transferred)\n",
1794 rc, req->rq_bulk->bd_nob_transferred);
1798 if (req->rq_bulk == NULL) {
1800 int nob, pg_count, i = 0;
1803 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1804 pg_count = aa->aa_page_count;
1805 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1808 while (nob > 0 && pg_count > 0) {
1810 int count = aa->aa_ppga[i]->count > nob ?
1811 nob : aa->aa_ppga[i]->count;
1813 CDEBUG(D_CACHE, "page %p count %d\n",
1814 aa->aa_ppga[i]->pg, count);
1815 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1816 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1818 ll_kunmap_atomic((void *) ptr, KM_USER0);
1827 if (rc < aa->aa_requested_nob)
1828 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1830 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1831 static int cksum_counter;
1832 u32 server_cksum = body->oa.o_cksum;
1835 enum cksum_types cksum_type;
1836 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1837 body->oa.o_flags : 0;
1839 cksum_type = obd_cksum_type_unpack(o_flags);
1840 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1841 aa->aa_page_count, aa->aa_ppga,
1842 OST_READ, &client_cksum);
1846 if (req->rq_bulk != NULL &&
1847 peer->nid != req->rq_bulk->bd_sender) {
1849 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1852 if (server_cksum != client_cksum) {
1853 struct ost_body *clbody;
1854 u32 page_count = aa->aa_page_count;
1856 clbody = req_capsule_client_get(&req->rq_pill,
1858 if (cli->cl_checksum_dump)
1859 dump_all_bulk_pages(&clbody->oa, page_count,
1860 aa->aa_ppga, server_cksum,
1863 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1864 "%s%s%s inode "DFID" object "DOSTID
1865 " extent [%llu-%llu], client %x, "
1866 "server %x, cksum_type %x\n",
1868 libcfs_nid2str(peer->nid),
1870 clbody->oa.o_valid & OBD_MD_FLFID ?
1871 clbody->oa.o_parent_seq : 0ULL,
1872 clbody->oa.o_valid & OBD_MD_FLFID ?
1873 clbody->oa.o_parent_oid : 0,
1874 clbody->oa.o_valid & OBD_MD_FLFID ?
1875 clbody->oa.o_parent_ver : 0,
1876 POSTID(&body->oa.o_oi),
1877 aa->aa_ppga[0]->off,
1878 aa->aa_ppga[page_count-1]->off +
1879 aa->aa_ppga[page_count-1]->count - 1,
1880 client_cksum, server_cksum,
1883 aa->aa_oa->o_cksum = client_cksum;
1887 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1890 } else if (unlikely(client_cksum)) {
1891 static int cksum_missed;
1894 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1895 CERROR("Checksum %u requested from %s but not sent\n",
1896 cksum_missed, libcfs_nid2str(peer->nid));
1902 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1903 aa->aa_oa, &body->oa);
1908 static int osc_brw_redo_request(struct ptlrpc_request *request,
1909 struct osc_brw_async_args *aa, int rc)
1911 struct ptlrpc_request *new_req;
1912 struct osc_brw_async_args *new_aa;
1913 struct osc_async_page *oap;
1916 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1917 "redo for recoverable error %d", rc);
1919 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1920 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1921 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1922 aa->aa_ppga, &new_req, 1);
1926 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1927 if (oap->oap_request != NULL) {
1928 LASSERTF(request == oap->oap_request,
1929 "request %p != oap_request %p\n",
1930 request, oap->oap_request);
1931 if (oap->oap_interrupted) {
1932 ptlrpc_req_finished(new_req);
1937 /* New request takes over pga and oaps from old request.
1938 * Note that copying a list_head doesn't work, need to move it... */
1940 new_req->rq_interpret_reply = request->rq_interpret_reply;
1941 new_req->rq_async_args = request->rq_async_args;
1942 new_req->rq_commit_cb = request->rq_commit_cb;
1943 /* cap resend delay to the current request timeout, this is similar to
1944 * what ptlrpc does (see after_reply()) */
1945 if (aa->aa_resends > new_req->rq_timeout)
1946 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1948 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1949 new_req->rq_generation_set = 1;
1950 new_req->rq_import_generation = request->rq_import_generation;
1952 new_aa = ptlrpc_req_async_args(new_req);
1954 INIT_LIST_HEAD(&new_aa->aa_oaps);
1955 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1956 INIT_LIST_HEAD(&new_aa->aa_exts);
1957 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1958 new_aa->aa_resends = aa->aa_resends;
1960 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1961 if (oap->oap_request) {
1962 ptlrpc_req_finished(oap->oap_request);
1963 oap->oap_request = ptlrpc_request_addref(new_req);
1967 /* XXX: This code will run into problem if we're going to support
1968 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1969 * and wait for all of them to be finished. We should inherit request
1970 * set from old request. */
1971 ptlrpcd_add_req(new_req);
1973 DEBUG_REQ(D_INFO, new_req, "new request");
1978 * ugh, we want disk allocation on the target to happen in offset order. we'll
1979 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1980 * fine for our small page arrays and doesn't require allocation. its an
1981 * insertion sort that swaps elements that are strides apart, shrinking the
1982 * stride down until its '1' and the array is sorted.
1984 static void sort_brw_pages(struct brw_page **array, int num)
1987 struct brw_page *tmp;
1991 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1996 for (i = stride ; i < num ; i++) {
1999 while (j >= stride && array[j - stride]->off > tmp->off) {
2000 array[j] = array[j - stride];
2005 } while (stride > 1);
2008 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2010 LASSERT(ppga != NULL);
2011 OBD_FREE(ppga, sizeof(*ppga) * count);
2014 static int brw_interpret(const struct lu_env *env,
2015 struct ptlrpc_request *req, void *data, int rc)
2017 struct osc_brw_async_args *aa = data;
2018 struct osc_extent *ext;
2019 struct osc_extent *tmp;
2020 struct client_obd *cli = aa->aa_cli;
2021 unsigned long transferred = 0;
2024 rc = osc_brw_fini_request(req, rc);
2025 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2026 /* When server return -EINPROGRESS, client should always retry
2027 * regardless of the number of times the bulk was resent already. */
2028 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2029 if (req->rq_import_generation !=
2030 req->rq_import->imp_generation) {
2031 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2032 ""DOSTID", rc = %d.\n",
2033 req->rq_import->imp_obd->obd_name,
2034 POSTID(&aa->aa_oa->o_oi), rc);
2035 } else if (rc == -EINPROGRESS ||
2036 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2037 rc = osc_brw_redo_request(req, aa, rc);
2039 CERROR("%s: too many resent retries for object: "
2040 "%llu:%llu, rc = %d.\n",
2041 req->rq_import->imp_obd->obd_name,
2042 POSTID(&aa->aa_oa->o_oi), rc);
2047 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2052 struct obdo *oa = aa->aa_oa;
2053 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2054 unsigned long valid = 0;
2055 struct cl_object *obj;
2056 struct osc_async_page *last;
2058 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2059 obj = osc2cl(last->oap_obj);
2061 cl_object_attr_lock(obj);
2062 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2063 attr->cat_blocks = oa->o_blocks;
2064 valid |= CAT_BLOCKS;
2066 if (oa->o_valid & OBD_MD_FLMTIME) {
2067 attr->cat_mtime = oa->o_mtime;
2070 if (oa->o_valid & OBD_MD_FLATIME) {
2071 attr->cat_atime = oa->o_atime;
2074 if (oa->o_valid & OBD_MD_FLCTIME) {
2075 attr->cat_ctime = oa->o_ctime;
2079 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2080 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2081 loff_t last_off = last->oap_count + last->oap_obj_off +
2084 /* Change file size if this is an out of quota or
2085 * direct IO write and it extends the file size */
2086 if (loi->loi_lvb.lvb_size < last_off) {
2087 attr->cat_size = last_off;
2090 /* Extend KMS if it's not a lockless write */
2091 if (loi->loi_kms < last_off &&
2092 oap2osc_page(last)->ops_srvlock == 0) {
2093 attr->cat_kms = last_off;
2099 cl_object_attr_update(env, obj, attr, valid);
2100 cl_object_attr_unlock(obj);
2102 OBDO_FREE(aa->aa_oa);
2104 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2105 osc_inc_unstable_pages(req);
2107 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2108 list_del_init(&ext->oe_link);
2109 osc_extent_finish(env, ext, 1,
2110 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2112 LASSERT(list_empty(&aa->aa_exts));
2113 LASSERT(list_empty(&aa->aa_oaps));
2115 transferred = (req->rq_bulk == NULL ? /* short io */
2116 aa->aa_requested_nob :
2117 req->rq_bulk->bd_nob_transferred);
2119 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2120 ptlrpc_lprocfs_brw(req, transferred);
2122 spin_lock(&cli->cl_loi_list_lock);
2123 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2124 * is called so we know whether to go to sync BRWs or wait for more
2125 * RPCs to complete */
2126 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2127 cli->cl_w_in_flight--;
2129 cli->cl_r_in_flight--;
2130 osc_wake_cache_waiters(cli);
2131 spin_unlock(&cli->cl_loi_list_lock);
2133 osc_io_unplug(env, cli, NULL);
2137 static void brw_commit(struct ptlrpc_request *req)
2139 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2140 * this called via the rq_commit_cb, I need to ensure
2141 * osc_dec_unstable_pages is still called. Otherwise unstable
2142 * pages may be leaked. */
2143 spin_lock(&req->rq_lock);
2144 if (likely(req->rq_unstable)) {
2145 req->rq_unstable = 0;
2146 spin_unlock(&req->rq_lock);
2148 osc_dec_unstable_pages(req);
2150 req->rq_committed = 1;
2151 spin_unlock(&req->rq_lock);
2156 * Build an RPC by the list of extent @ext_list. The caller must ensure
2157 * that the total pages in this list are NOT over max pages per RPC.
2158 * Extents in the list must be in OES_RPC state.
2160 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2161 struct list_head *ext_list, int cmd)
2163 struct ptlrpc_request *req = NULL;
2164 struct osc_extent *ext;
2165 struct brw_page **pga = NULL;
2166 struct osc_brw_async_args *aa = NULL;
2167 struct obdo *oa = NULL;
2168 struct osc_async_page *oap;
2169 struct osc_object *obj = NULL;
2170 struct cl_req_attr *crattr = NULL;
2171 loff_t starting_offset = OBD_OBJECT_EOF;
2172 loff_t ending_offset = 0;
2176 bool soft_sync = false;
2177 bool interrupted = false;
2178 bool ndelay = false;
2182 __u32 layout_version = 0;
2183 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2184 struct ost_body *body;
2186 LASSERT(!list_empty(ext_list));
2188 /* add pages into rpc_list to build BRW rpc */
2189 list_for_each_entry(ext, ext_list, oe_link) {
2190 LASSERT(ext->oe_state == OES_RPC);
2191 mem_tight |= ext->oe_memalloc;
2192 grant += ext->oe_grants;
2193 page_count += ext->oe_nr_pages;
2194 layout_version = MAX(layout_version, ext->oe_layout_version);
2199 soft_sync = osc_over_unstable_soft_limit(cli);
2201 mpflag = cfs_memory_pressure_get_and_set();
2203 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2205 GOTO(out, rc = -ENOMEM);
2209 GOTO(out, rc = -ENOMEM);
2212 list_for_each_entry(ext, ext_list, oe_link) {
2213 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2215 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2217 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2218 pga[i] = &oap->oap_brw_page;
2219 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2222 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2223 if (starting_offset == OBD_OBJECT_EOF ||
2224 starting_offset > oap->oap_obj_off)
2225 starting_offset = oap->oap_obj_off;
2227 LASSERT(oap->oap_page_off == 0);
2228 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2229 ending_offset = oap->oap_obj_off +
2232 LASSERT(oap->oap_page_off + oap->oap_count ==
2234 if (oap->oap_interrupted)
2241 /* first page in the list */
2242 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2244 crattr = &osc_env_info(env)->oti_req_attr;
2245 memset(crattr, 0, sizeof(*crattr));
2246 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2247 crattr->cra_flags = ~0ULL;
2248 crattr->cra_page = oap2cl_page(oap);
2249 crattr->cra_oa = oa;
2250 cl_req_attr_set(env, osc2cl(obj), crattr);
2252 if (cmd == OBD_BRW_WRITE) {
2253 oa->o_grant_used = grant;
2254 if (layout_version > 0) {
2255 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2256 PFID(&oa->o_oi.oi_fid), layout_version);
2258 oa->o_layout_version = layout_version;
2259 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2263 sort_brw_pages(pga, page_count);
2264 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2266 CERROR("prep_req failed: %d\n", rc);
2270 req->rq_commit_cb = brw_commit;
2271 req->rq_interpret_reply = brw_interpret;
2272 req->rq_memalloc = mem_tight != 0;
2273 oap->oap_request = ptlrpc_request_addref(req);
2274 if (interrupted && !req->rq_intr)
2275 ptlrpc_mark_interrupted(req);
2277 req->rq_no_resend = req->rq_no_delay = 1;
2278 /* probably set a shorter timeout value.
2279 * to handle ETIMEDOUT in brw_interpret() correctly. */
2280 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2283 /* Need to update the timestamps after the request is built in case
2284 * we race with setattr (locally or in queue at OST). If OST gets
2285 * later setattr before earlier BRW (as determined by the request xid),
2286 * the OST will not use BRW timestamps. Sadly, there is no obvious
2287 * way to do this in a single call. bug 10150 */
2288 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2289 crattr->cra_oa = &body->oa;
2290 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2291 cl_req_attr_set(env, osc2cl(obj), crattr);
2292 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2294 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2295 aa = ptlrpc_req_async_args(req);
2296 INIT_LIST_HEAD(&aa->aa_oaps);
2297 list_splice_init(&rpc_list, &aa->aa_oaps);
2298 INIT_LIST_HEAD(&aa->aa_exts);
2299 list_splice_init(ext_list, &aa->aa_exts);
2301 spin_lock(&cli->cl_loi_list_lock);
2302 starting_offset >>= PAGE_SHIFT;
2303 if (cmd == OBD_BRW_READ) {
2304 cli->cl_r_in_flight++;
2305 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2306 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2307 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2308 starting_offset + 1);
2310 cli->cl_w_in_flight++;
2311 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2312 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2313 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2314 starting_offset + 1);
2316 spin_unlock(&cli->cl_loi_list_lock);
2318 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2319 page_count, aa, cli->cl_r_in_flight,
2320 cli->cl_w_in_flight);
2321 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2323 ptlrpcd_add_req(req);
2329 cfs_memory_pressure_restore(mpflag);
2332 LASSERT(req == NULL);
2337 OBD_FREE(pga, sizeof(*pga) * page_count);
2338 /* this should happen rarely and is pretty bad, it makes the
2339 * pending list not follow the dirty order */
2340 while (!list_empty(ext_list)) {
2341 ext = list_entry(ext_list->next, struct osc_extent,
2343 list_del_init(&ext->oe_link);
2344 osc_extent_finish(env, ext, 0, rc);
2350 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2354 LASSERT(lock != NULL);
2356 lock_res_and_lock(lock);
2358 if (lock->l_ast_data == NULL)
2359 lock->l_ast_data = data;
2360 if (lock->l_ast_data == data)
2363 unlock_res_and_lock(lock);
2368 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2369 void *cookie, struct lustre_handle *lockh,
2370 enum ldlm_mode mode, __u64 *flags, bool speculative,
2373 bool intent = *flags & LDLM_FL_HAS_INTENT;
2377 /* The request was created before ldlm_cli_enqueue call. */
2378 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2379 struct ldlm_reply *rep;
2381 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2382 LASSERT(rep != NULL);
2384 rep->lock_policy_res1 =
2385 ptlrpc_status_ntoh(rep->lock_policy_res1);
2386 if (rep->lock_policy_res1)
2387 errcode = rep->lock_policy_res1;
2389 *flags |= LDLM_FL_LVB_READY;
2390 } else if (errcode == ELDLM_OK) {
2391 *flags |= LDLM_FL_LVB_READY;
2394 /* Call the update callback. */
2395 rc = (*upcall)(cookie, lockh, errcode);
2397 /* release the reference taken in ldlm_cli_enqueue() */
2398 if (errcode == ELDLM_LOCK_MATCHED)
2400 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2401 ldlm_lock_decref(lockh, mode);
2406 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2407 struct osc_enqueue_args *aa, int rc)
2409 struct ldlm_lock *lock;
2410 struct lustre_handle *lockh = &aa->oa_lockh;
2411 enum ldlm_mode mode = aa->oa_mode;
2412 struct ost_lvb *lvb = aa->oa_lvb;
2413 __u32 lvb_len = sizeof(*lvb);
2418 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2420 lock = ldlm_handle2lock(lockh);
2421 LASSERTF(lock != NULL,
2422 "lockh %#llx, req %p, aa %p - client evicted?\n",
2423 lockh->cookie, req, aa);
2425 /* Take an additional reference so that a blocking AST that
2426 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2427 * to arrive after an upcall has been executed by
2428 * osc_enqueue_fini(). */
2429 ldlm_lock_addref(lockh, mode);
2431 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2432 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2434 /* Let CP AST to grant the lock first. */
2435 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2437 if (aa->oa_speculative) {
2438 LASSERT(aa->oa_lvb == NULL);
2439 LASSERT(aa->oa_flags == NULL);
2440 aa->oa_flags = &flags;
2443 /* Complete obtaining the lock procedure. */
2444 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2445 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2447 /* Complete osc stuff. */
2448 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2449 aa->oa_flags, aa->oa_speculative, rc);
2451 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2453 ldlm_lock_decref(lockh, mode);
2454 LDLM_LOCK_PUT(lock);
2458 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2460 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2461 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2462 * other synchronous requests, however keeping some locks and trying to obtain
2463 * others may take a considerable amount of time in a case of ost failure; and
2464 * when other sync requests do not get released lock from a client, the client
2465 * is evicted from the cluster -- such scenarious make the life difficult, so
2466 * release locks just after they are obtained. */
2467 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2468 __u64 *flags, union ldlm_policy_data *policy,
2469 struct ost_lvb *lvb, int kms_valid,
2470 osc_enqueue_upcall_f upcall, void *cookie,
2471 struct ldlm_enqueue_info *einfo,
2472 struct ptlrpc_request_set *rqset, int async,
2475 struct obd_device *obd = exp->exp_obd;
2476 struct lustre_handle lockh = { 0 };
2477 struct ptlrpc_request *req = NULL;
2478 int intent = *flags & LDLM_FL_HAS_INTENT;
2479 __u64 match_flags = *flags;
2480 enum ldlm_mode mode;
2484 /* Filesystem lock extents are extended to page boundaries so that
2485 * dealing with the page cache is a little smoother. */
2486 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2487 policy->l_extent.end |= ~PAGE_MASK;
2490 * kms is not valid when either object is completely fresh (so that no
2491 * locks are cached), or object was evicted. In the latter case cached
2492 * lock cannot be used, because it would prime inode state with
2493 * potentially stale LVB.
2498 /* Next, search for already existing extent locks that will cover us */
2499 /* If we're trying to read, we also search for an existing PW lock. The
2500 * VFS and page cache already protect us locally, so lots of readers/
2501 * writers can share a single PW lock.
2503 * There are problems with conversion deadlocks, so instead of
2504 * converting a read lock to a write lock, we'll just enqueue a new
2507 * At some point we should cancel the read lock instead of making them
2508 * send us a blocking callback, but there are problems with canceling
2509 * locks out from other users right now, too. */
2510 mode = einfo->ei_mode;
2511 if (einfo->ei_mode == LCK_PR)
2513 /* Normal lock requests must wait for the LVB to be ready before
2514 * matching a lock; speculative lock requests do not need to,
2515 * because they will not actually use the lock. */
2517 match_flags |= LDLM_FL_LVB_READY;
2519 match_flags |= LDLM_FL_BLOCK_GRANTED;
2520 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2521 einfo->ei_type, policy, mode, &lockh, 0);
2523 struct ldlm_lock *matched;
2525 if (*flags & LDLM_FL_TEST_LOCK)
2528 matched = ldlm_handle2lock(&lockh);
2530 /* This DLM lock request is speculative, and does not
2531 * have an associated IO request. Therefore if there
2532 * is already a DLM lock, it wll just inform the
2533 * caller to cancel the request for this stripe.*/
2534 lock_res_and_lock(matched);
2535 if (ldlm_extent_equal(&policy->l_extent,
2536 &matched->l_policy_data.l_extent))
2540 unlock_res_and_lock(matched);
2542 ldlm_lock_decref(&lockh, mode);
2543 LDLM_LOCK_PUT(matched);
2545 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2546 *flags |= LDLM_FL_LVB_READY;
2548 /* We already have a lock, and it's referenced. */
2549 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2551 ldlm_lock_decref(&lockh, mode);
2552 LDLM_LOCK_PUT(matched);
2555 ldlm_lock_decref(&lockh, mode);
2556 LDLM_LOCK_PUT(matched);
2561 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2565 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2566 &RQF_LDLM_ENQUEUE_LVB);
2570 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2572 ptlrpc_request_free(req);
2576 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2578 ptlrpc_request_set_replen(req);
2581 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2582 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2584 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2585 sizeof(*lvb), LVB_T_OST, &lockh, async);
2588 struct osc_enqueue_args *aa;
2589 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2590 aa = ptlrpc_req_async_args(req);
2592 aa->oa_mode = einfo->ei_mode;
2593 aa->oa_type = einfo->ei_type;
2594 lustre_handle_copy(&aa->oa_lockh, &lockh);
2595 aa->oa_upcall = upcall;
2596 aa->oa_cookie = cookie;
2597 aa->oa_speculative = speculative;
2599 aa->oa_flags = flags;
2602 /* speculative locks are essentially to enqueue
2603 * a DLM lock in advance, so we don't care
2604 * about the result of the enqueue. */
2606 aa->oa_flags = NULL;
2609 req->rq_interpret_reply =
2610 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2611 if (rqset == PTLRPCD_SET)
2612 ptlrpcd_add_req(req);
2614 ptlrpc_set_add_req(rqset, req);
2615 } else if (intent) {
2616 ptlrpc_req_finished(req);
2621 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2622 flags, speculative, rc);
2624 ptlrpc_req_finished(req);
2629 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2630 enum ldlm_type type, union ldlm_policy_data *policy,
2631 enum ldlm_mode mode, __u64 *flags, void *data,
2632 struct lustre_handle *lockh, int unref)
2634 struct obd_device *obd = exp->exp_obd;
2635 __u64 lflags = *flags;
2639 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2642 /* Filesystem lock extents are extended to page boundaries so that
2643 * dealing with the page cache is a little smoother */
2644 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2645 policy->l_extent.end |= ~PAGE_MASK;
2647 /* Next, search for already existing extent locks that will cover us */
2648 /* If we're trying to read, we also search for an existing PW lock. The
2649 * VFS and page cache already protect us locally, so lots of readers/
2650 * writers can share a single PW lock. */
2654 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2655 res_id, type, policy, rc, lockh, unref);
2656 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2660 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2662 LASSERT(lock != NULL);
2663 if (!osc_set_lock_data(lock, data)) {
2664 ldlm_lock_decref(lockh, rc);
2667 LDLM_LOCK_PUT(lock);
2672 static int osc_statfs_interpret(const struct lu_env *env,
2673 struct ptlrpc_request *req,
2674 struct osc_async_args *aa, int rc)
2676 struct obd_statfs *msfs;
2680 /* The request has in fact never been sent
2681 * due to issues at a higher level (LOV).
2682 * Exit immediately since the caller is
2683 * aware of the problem and takes care
2684 * of the clean up */
2687 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2688 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2694 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2696 GOTO(out, rc = -EPROTO);
2699 *aa->aa_oi->oi_osfs = *msfs;
2701 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2705 static int osc_statfs_async(struct obd_export *exp,
2706 struct obd_info *oinfo, time64_t max_age,
2707 struct ptlrpc_request_set *rqset)
2709 struct obd_device *obd = class_exp2obd(exp);
2710 struct ptlrpc_request *req;
2711 struct osc_async_args *aa;
2715 /* We could possibly pass max_age in the request (as an absolute
2716 * timestamp or a "seconds.usec ago") so the target can avoid doing
2717 * extra calls into the filesystem if that isn't necessary (e.g.
2718 * during mount that would help a bit). Having relative timestamps
2719 * is not so great if request processing is slow, while absolute
2720 * timestamps are not ideal because they need time synchronization. */
2721 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2725 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2727 ptlrpc_request_free(req);
2730 ptlrpc_request_set_replen(req);
2731 req->rq_request_portal = OST_CREATE_PORTAL;
2732 ptlrpc_at_set_req_timeout(req);
2734 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2735 /* procfs requests not want stat in wait for avoid deadlock */
2736 req->rq_no_resend = 1;
2737 req->rq_no_delay = 1;
2740 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2741 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2742 aa = ptlrpc_req_async_args(req);
2745 ptlrpc_set_add_req(rqset, req);
2749 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2750 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2752 struct obd_device *obd = class_exp2obd(exp);
2753 struct obd_statfs *msfs;
2754 struct ptlrpc_request *req;
2755 struct obd_import *imp = NULL;
2760 /*Since the request might also come from lprocfs, so we need
2761 *sync this with client_disconnect_export Bug15684*/
2762 down_read(&obd->u.cli.cl_sem);
2763 if (obd->u.cli.cl_import)
2764 imp = class_import_get(obd->u.cli.cl_import);
2765 up_read(&obd->u.cli.cl_sem);
2769 /* We could possibly pass max_age in the request (as an absolute
2770 * timestamp or a "seconds.usec ago") so the target can avoid doing
2771 * extra calls into the filesystem if that isn't necessary (e.g.
2772 * during mount that would help a bit). Having relative timestamps
2773 * is not so great if request processing is slow, while absolute
2774 * timestamps are not ideal because they need time synchronization. */
2775 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2777 class_import_put(imp);
2782 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2784 ptlrpc_request_free(req);
2787 ptlrpc_request_set_replen(req);
2788 req->rq_request_portal = OST_CREATE_PORTAL;
2789 ptlrpc_at_set_req_timeout(req);
2791 if (flags & OBD_STATFS_NODELAY) {
2792 /* procfs requests not want stat in wait for avoid deadlock */
2793 req->rq_no_resend = 1;
2794 req->rq_no_delay = 1;
2797 rc = ptlrpc_queue_wait(req);
2801 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2803 GOTO(out, rc = -EPROTO);
2809 ptlrpc_req_finished(req);
2813 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2814 void *karg, void __user *uarg)
2816 struct obd_device *obd = exp->exp_obd;
2817 struct obd_ioctl_data *data = karg;
2821 if (!try_module_get(THIS_MODULE)) {
2822 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2823 module_name(THIS_MODULE));
2827 case OBD_IOC_CLIENT_RECOVER:
2828 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2829 data->ioc_inlbuf1, 0);
2833 case IOC_OSC_SET_ACTIVE:
2834 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2837 case OBD_IOC_PING_TARGET:
2838 err = ptlrpc_obd_ping(obd);
2841 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2842 cmd, current_comm());
2843 GOTO(out, err = -ENOTTY);
2846 module_put(THIS_MODULE);
2850 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2851 u32 keylen, void *key, u32 vallen, void *val,
2852 struct ptlrpc_request_set *set)
2854 struct ptlrpc_request *req;
2855 struct obd_device *obd = exp->exp_obd;
2856 struct obd_import *imp = class_exp2cliimp(exp);
2861 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2863 if (KEY_IS(KEY_CHECKSUM)) {
2864 if (vallen != sizeof(int))
2866 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2870 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2871 sptlrpc_conf_client_adapt(obd);
2875 if (KEY_IS(KEY_FLUSH_CTX)) {
2876 sptlrpc_import_flush_my_ctx(imp);
2880 if (KEY_IS(KEY_CACHE_SET)) {
2881 struct client_obd *cli = &obd->u.cli;
2883 LASSERT(cli->cl_cache == NULL); /* only once */
2884 cli->cl_cache = (struct cl_client_cache *)val;
2885 cl_cache_incref(cli->cl_cache);
2886 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2888 /* add this osc into entity list */
2889 LASSERT(list_empty(&cli->cl_lru_osc));
2890 spin_lock(&cli->cl_cache->ccc_lru_lock);
2891 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2892 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2897 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2898 struct client_obd *cli = &obd->u.cli;
2899 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2900 long target = *(long *)val;
2902 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2907 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2910 /* We pass all other commands directly to OST. Since nobody calls osc
2911 methods directly and everybody is supposed to go through LOV, we
2912 assume lov checked invalid values for us.
2913 The only recognised values so far are evict_by_nid and mds_conn.
2914 Even if something bad goes through, we'd get a -EINVAL from OST
2917 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2918 &RQF_OST_SET_GRANT_INFO :
2923 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2924 RCL_CLIENT, keylen);
2925 if (!KEY_IS(KEY_GRANT_SHRINK))
2926 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2927 RCL_CLIENT, vallen);
2928 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2930 ptlrpc_request_free(req);
2934 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2935 memcpy(tmp, key, keylen);
2936 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2939 memcpy(tmp, val, vallen);
2941 if (KEY_IS(KEY_GRANT_SHRINK)) {
2942 struct osc_grant_args *aa;
2945 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2946 aa = ptlrpc_req_async_args(req);
2949 ptlrpc_req_finished(req);
2952 *oa = ((struct ost_body *)val)->oa;
2954 req->rq_interpret_reply = osc_shrink_grant_interpret;
2957 ptlrpc_request_set_replen(req);
2958 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2959 LASSERT(set != NULL);
2960 ptlrpc_set_add_req(set, req);
2961 ptlrpc_check_set(NULL, set);
2963 ptlrpcd_add_req(req);
2968 EXPORT_SYMBOL(osc_set_info_async);
2970 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2971 struct obd_device *obd, struct obd_uuid *cluuid,
2972 struct obd_connect_data *data, void *localdata)
2974 struct client_obd *cli = &obd->u.cli;
2976 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2980 spin_lock(&cli->cl_loi_list_lock);
2981 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2982 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2983 grant += cli->cl_dirty_grant;
2985 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2986 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2987 lost_grant = cli->cl_lost_grant;
2988 cli->cl_lost_grant = 0;
2989 spin_unlock(&cli->cl_loi_list_lock);
2991 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2992 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2993 data->ocd_version, data->ocd_grant, lost_grant);
2998 EXPORT_SYMBOL(osc_reconnect);
3000 int osc_disconnect(struct obd_export *exp)
3002 struct obd_device *obd = class_exp2obd(exp);
3005 rc = client_disconnect_export(exp);
3007 * Initially we put del_shrink_grant before disconnect_export, but it
3008 * causes the following problem if setup (connect) and cleanup
3009 * (disconnect) are tangled together.
3010 * connect p1 disconnect p2
3011 * ptlrpc_connect_import
3012 * ............... class_manual_cleanup
3015 * ptlrpc_connect_interrupt
3017 * add this client to shrink list
3019 * Bang! grant shrink thread trigger the shrink. BUG18662
3021 osc_del_grant_list(&obd->u.cli);
3024 EXPORT_SYMBOL(osc_disconnect);
3026 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3027 struct hlist_node *hnode, void *arg)
3029 struct lu_env *env = arg;
3030 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3031 struct ldlm_lock *lock;
3032 struct osc_object *osc = NULL;
3036 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3037 if (lock->l_ast_data != NULL && osc == NULL) {
3038 osc = lock->l_ast_data;
3039 cl_object_get(osc2cl(osc));
3042 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3043 * by the 2nd round of ldlm_namespace_clean() call in
3044 * osc_import_event(). */
3045 ldlm_clear_cleaned(lock);
3050 osc_object_invalidate(env, osc);
3051 cl_object_put(env, osc2cl(osc));
3056 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3058 static int osc_import_event(struct obd_device *obd,
3059 struct obd_import *imp,
3060 enum obd_import_event event)
3062 struct client_obd *cli;
3066 LASSERT(imp->imp_obd == obd);
3069 case IMP_EVENT_DISCON: {
3071 spin_lock(&cli->cl_loi_list_lock);
3072 cli->cl_avail_grant = 0;
3073 cli->cl_lost_grant = 0;
3074 spin_unlock(&cli->cl_loi_list_lock);
3077 case IMP_EVENT_INACTIVE: {
3078 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3081 case IMP_EVENT_INVALIDATE: {
3082 struct ldlm_namespace *ns = obd->obd_namespace;
3086 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3088 env = cl_env_get(&refcheck);
3090 osc_io_unplug(env, &obd->u.cli, NULL);
3092 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3093 osc_ldlm_resource_invalidate,
3095 cl_env_put(env, &refcheck);
3097 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3102 case IMP_EVENT_ACTIVE: {
3103 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3106 case IMP_EVENT_OCD: {
3107 struct obd_connect_data *ocd = &imp->imp_connect_data;
3109 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3110 osc_init_grant(&obd->u.cli, ocd);
3113 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3114 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3116 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3119 case IMP_EVENT_DEACTIVATE: {
3120 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3123 case IMP_EVENT_ACTIVATE: {
3124 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3128 CERROR("Unknown import event %d\n", event);
3135 * Determine whether the lock can be canceled before replaying the lock
3136 * during recovery, see bug16774 for detailed information.
3138 * \retval zero the lock can't be canceled
3139 * \retval other ok to cancel
3141 static int osc_cancel_weight(struct ldlm_lock *lock)
3144 * Cancel all unused and granted extent lock.
3146 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3147 lock->l_granted_mode == lock->l_req_mode &&
3148 osc_ldlm_weigh_ast(lock) == 0)
3154 static int brw_queue_work(const struct lu_env *env, void *data)
3156 struct client_obd *cli = data;
3158 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3160 osc_io_unplug(env, cli, NULL);
3164 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3166 struct client_obd *cli = &obd->u.cli;
3172 rc = ptlrpcd_addref();
3176 rc = client_obd_setup(obd, lcfg);
3178 GOTO(out_ptlrpcd, rc);
3181 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3182 if (IS_ERR(handler))
3183 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3184 cli->cl_writeback_work = handler;
3186 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3187 if (IS_ERR(handler))
3188 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3189 cli->cl_lru_work = handler;
3191 rc = osc_quota_setup(obd);
3193 GOTO(out_ptlrpcd_work, rc);
3195 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3196 osc_update_next_shrink(cli);
3201 if (cli->cl_writeback_work != NULL) {
3202 ptlrpcd_destroy_work(cli->cl_writeback_work);
3203 cli->cl_writeback_work = NULL;
3205 if (cli->cl_lru_work != NULL) {
3206 ptlrpcd_destroy_work(cli->cl_lru_work);
3207 cli->cl_lru_work = NULL;
3209 client_obd_cleanup(obd);
3214 EXPORT_SYMBOL(osc_setup_common);
3216 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3218 struct client_obd *cli = &obd->u.cli;
3226 rc = osc_setup_common(obd, lcfg);
3230 rc = osc_tunables_init(obd);
3235 * We try to control the total number of requests with a upper limit
3236 * osc_reqpool_maxreqcount. There might be some race which will cause
3237 * over-limit allocation, but it is fine.
3239 req_count = atomic_read(&osc_pool_req_count);
3240 if (req_count < osc_reqpool_maxreqcount) {
3241 adding = cli->cl_max_rpcs_in_flight + 2;
3242 if (req_count + adding > osc_reqpool_maxreqcount)
3243 adding = osc_reqpool_maxreqcount - req_count;
3245 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3246 atomic_add(added, &osc_pool_req_count);
3249 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3251 spin_lock(&osc_shrink_lock);
3252 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3253 spin_unlock(&osc_shrink_lock);
3254 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3259 int osc_precleanup_common(struct obd_device *obd)
3261 struct client_obd *cli = &obd->u.cli;
3265 * for echo client, export may be on zombie list, wait for
3266 * zombie thread to cull it, because cli.cl_import will be
3267 * cleared in client_disconnect_export():
3268 * class_export_destroy() -> obd_cleanup() ->
3269 * echo_device_free() -> echo_client_cleanup() ->
3270 * obd_disconnect() -> osc_disconnect() ->
3271 * client_disconnect_export()
3273 obd_zombie_barrier();
3274 if (cli->cl_writeback_work) {
3275 ptlrpcd_destroy_work(cli->cl_writeback_work);
3276 cli->cl_writeback_work = NULL;
3279 if (cli->cl_lru_work) {
3280 ptlrpcd_destroy_work(cli->cl_lru_work);
3281 cli->cl_lru_work = NULL;
3284 obd_cleanup_client_import(obd);
3287 EXPORT_SYMBOL(osc_precleanup_common);
3289 static int osc_precleanup(struct obd_device *obd)
3293 osc_precleanup_common(obd);
3295 ptlrpc_lprocfs_unregister_obd(obd);
3299 int osc_cleanup_common(struct obd_device *obd)
3301 struct client_obd *cli = &obd->u.cli;
3306 spin_lock(&osc_shrink_lock);
3307 list_del(&cli->cl_shrink_list);
3308 spin_unlock(&osc_shrink_lock);
3311 if (cli->cl_cache != NULL) {
3312 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3313 spin_lock(&cli->cl_cache->ccc_lru_lock);
3314 list_del_init(&cli->cl_lru_osc);
3315 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3316 cli->cl_lru_left = NULL;
3317 cl_cache_decref(cli->cl_cache);
3318 cli->cl_cache = NULL;
3321 /* free memory of osc quota cache */
3322 osc_quota_cleanup(obd);
3324 rc = client_obd_cleanup(obd);
3329 EXPORT_SYMBOL(osc_cleanup_common);
3331 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3333 ssize_t count = class_modify_config(lcfg, PARAM_OSC,
3334 &obd->obd_kset.kobj);
3335 return count > 0 ? 0 : count;
3338 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3340 return osc_process_config_base(obd, buf);
3343 static struct obd_ops osc_obd_ops = {
3344 .o_owner = THIS_MODULE,
3345 .o_setup = osc_setup,
3346 .o_precleanup = osc_precleanup,
3347 .o_cleanup = osc_cleanup_common,
3348 .o_add_conn = client_import_add_conn,
3349 .o_del_conn = client_import_del_conn,
3350 .o_connect = client_connect_import,
3351 .o_reconnect = osc_reconnect,
3352 .o_disconnect = osc_disconnect,
3353 .o_statfs = osc_statfs,
3354 .o_statfs_async = osc_statfs_async,
3355 .o_create = osc_create,
3356 .o_destroy = osc_destroy,
3357 .o_getattr = osc_getattr,
3358 .o_setattr = osc_setattr,
3359 .o_iocontrol = osc_iocontrol,
3360 .o_set_info_async = osc_set_info_async,
3361 .o_import_event = osc_import_event,
3362 .o_process_config = osc_process_config,
3363 .o_quotactl = osc_quotactl,
3366 static struct shrinker *osc_cache_shrinker;
3367 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3368 DEFINE_SPINLOCK(osc_shrink_lock);
3370 #ifndef HAVE_SHRINKER_COUNT
3371 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3373 struct shrink_control scv = {
3374 .nr_to_scan = shrink_param(sc, nr_to_scan),
3375 .gfp_mask = shrink_param(sc, gfp_mask)
3377 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3378 struct shrinker *shrinker = NULL;
3381 (void)osc_cache_shrink_scan(shrinker, &scv);
3383 return osc_cache_shrink_count(shrinker, &scv);
3387 static int __init osc_init(void)
3389 bool enable_proc = true;
3390 struct obd_type *type;
3391 unsigned int reqpool_size;
3392 unsigned int reqsize;
3394 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3395 osc_cache_shrink_count, osc_cache_shrink_scan);
3398 /* print an address of _any_ initialized kernel symbol from this
3399 * module, to allow debugging with gdb that doesn't support data
3400 * symbols from modules.*/
3401 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3403 rc = lu_kmem_init(osc_caches);
3407 type = class_search_type(LUSTRE_OSP_NAME);
3408 if (type != NULL && type->typ_procsym != NULL)
3409 enable_proc = false;
3411 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3412 LUSTRE_OSC_NAME, &osc_device_type);
3416 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3418 /* This is obviously too much memory, only prevent overflow here */
3419 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3420 GOTO(out_type, rc = -EINVAL);
3422 reqpool_size = osc_reqpool_mem_max << 20;
3425 while (reqsize < OST_IO_MAXREQSIZE)
3426 reqsize = reqsize << 1;
3429 * We don't enlarge the request count in OSC pool according to
3430 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3431 * tried after normal allocation failed. So a small OSC pool won't
3432 * cause much performance degression in most of cases.
3434 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3436 atomic_set(&osc_pool_req_count, 0);
3437 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3438 ptlrpc_add_rqs_to_pool);
3440 if (osc_rq_pool == NULL)
3441 GOTO(out_type, rc = -ENOMEM);
3443 rc = osc_start_grant_work();
3445 GOTO(out_req_pool, rc);
3450 ptlrpc_free_rq_pool(osc_rq_pool);
3452 class_unregister_type(LUSTRE_OSC_NAME);
3454 lu_kmem_fini(osc_caches);
3459 static void __exit osc_exit(void)
3461 osc_stop_grant_work();
3462 remove_shrinker(osc_cache_shrinker);
3463 class_unregister_type(LUSTRE_OSC_NAME);
3464 lu_kmem_fini(osc_caches);
3465 ptlrpc_free_rq_pool(osc_rq_pool);
3468 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3469 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3470 MODULE_VERSION(LUSTRE_VERSION_STRING);
3471 MODULE_LICENSE("GPL");
3473 module_init(osc_init);
3474 module_exit(osc_exit);