Whamcloud - gitweb
LU-5823 clio: add cl_object_find_cbdata()
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 static inline void osc_pack_capa(struct ptlrpc_request *req,
104                                  struct ost_body *body, void *capa)
105 {
106         struct obd_capa *oc = (struct obd_capa *)capa;
107         struct lustre_capa *c;
108
109         if (!capa)
110                 return;
111
112         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
113         LASSERT(c);
114         capa_cpy(c, oc);
115         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
116         DEBUG_CAPA(D_SEC, c, "pack");
117 }
118
119 static inline void osc_pack_req_body(struct ptlrpc_request *req,
120                                      struct obd_info *oinfo)
121 {
122         struct ost_body *body;
123
124         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
125         LASSERT(body);
126
127         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
128                              oinfo->oi_oa);
129         osc_pack_capa(req, body, oinfo->oi_capa);
130 }
131
132 static inline void osc_set_capa_size(struct ptlrpc_request *req,
133                                      const struct req_msg_field *field,
134                                      struct obd_capa *oc)
135 {
136         if (oc == NULL)
137                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
138         else
139                 /* it is already calculated as sizeof struct obd_capa */
140                 ;
141 }
142
143 static int osc_getattr_interpret(const struct lu_env *env,
144                                  struct ptlrpc_request *req,
145                                  struct osc_async_args *aa, int rc)
146 {
147         struct ost_body *body;
148         ENTRY;
149
150         if (rc != 0)
151                 GOTO(out, rc);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154         if (body) {
155                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
157                                      aa->aa_oi->oi_oa, &body->oa);
158
159                 /* This should really be sent by the OST */
160                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
161                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
162         } else {
163                 CDEBUG(D_INFO, "can't unpack ost_body\n");
164                 rc = -EPROTO;
165                 aa->aa_oi->oi_oa->o_valid = 0;
166         }
167 out:
168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
169         RETURN(rc);
170 }
171
172 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
173                              struct ptlrpc_request_set *set)
174 {
175         struct ptlrpc_request *req;
176         struct osc_async_args *aa;
177         int                    rc;
178         ENTRY;
179
180         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
181         if (req == NULL)
182                 RETURN(-ENOMEM);
183
184         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
185         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
186         if (rc) {
187                 ptlrpc_request_free(req);
188                 RETURN(rc);
189         }
190
191         osc_pack_req_body(req, oinfo);
192
193         ptlrpc_request_set_replen(req);
194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
195
196         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
197         aa = ptlrpc_req_async_args(req);
198         aa->aa_oi = oinfo;
199
200         ptlrpc_set_add_req(set, req);
201         RETURN(0);
202 }
203
204 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
205                        struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body       *body;
209         int                    rc;
210         ENTRY;
211
212         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
213         if (req == NULL)
214                 RETURN(-ENOMEM);
215
216         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oinfo);
224
225         ptlrpc_request_set_replen(req);
226
227         rc = ptlrpc_queue_wait(req);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
232         if (body == NULL)
233                 GOTO(out, rc = -EPROTO);
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
237                              &body->oa);
238
239         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
249                        struct obd_info *oinfo, struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body       *body;
253         int                    rc;
254         ENTRY;
255
256         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
257
258         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
259         if (req == NULL)
260                 RETURN(-ENOMEM);
261
262         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
264         if (rc) {
265                 ptlrpc_request_free(req);
266                 RETURN(rc);
267         }
268
269         osc_pack_req_body(req, oinfo);
270
271         ptlrpc_request_set_replen(req);
272
273         rc = ptlrpc_queue_wait(req);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
282                              &body->oa);
283
284         EXIT;
285 out:
286         ptlrpc_req_finished(req);
287         RETURN(rc);
288 }
289
290 static int osc_setattr_interpret(const struct lu_env *env,
291                                  struct ptlrpc_request *req,
292                                  struct osc_setattr_args *sa, int rc)
293 {
294         struct ost_body *body;
295         ENTRY;
296
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
305                              &body->oa);
306 out:
307         rc = sa->sa_upcall(sa->sa_cookie, rc);
308         RETURN(rc);
309 }
310
311 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
312                            struct obd_trans_info *oti,
313                            obd_enqueue_update_f upcall, void *cookie,
314                            struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
333                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         /* do mds to ost setattr asynchronously */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
343         } else {
344                 req->rq_interpret_reply =
345                         (ptlrpc_interpterer_t)osc_setattr_interpret;
346
347                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
348                 sa = ptlrpc_req_async_args(req);
349                 sa->sa_oa = oinfo->oi_oa;
350                 sa->sa_upcall = upcall;
351                 sa->sa_cookie = cookie;
352
353                 if (rqset == PTLRPCD_SET)
354                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
355                 else
356                         ptlrpc_set_add_req(rqset, req);
357         }
358
359         RETURN(0);
360 }
361
362 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
363                              struct obd_trans_info *oti,
364                              struct ptlrpc_request_set *rqset)
365 {
366         return osc_setattr_async_base(exp, oinfo, oti,
367                                       oinfo->oi_cb_up, oinfo, rqset);
368 }
369
370 static int osc_create(const struct lu_env *env, struct obd_export *exp,
371                       struct obdo *oa, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body       *body;
375         int                    rc;
376         ENTRY;
377
378         LASSERT(oa != NULL);
379         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
380         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
383         if (req == NULL)
384                 GOTO(out, rc = -ENOMEM);
385
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 GOTO(out, rc);
390         }
391
392         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
393         LASSERT(body);
394
395         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
396
397         ptlrpc_request_set_replen(req);
398
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
412         if (body == NULL)
413                 GOTO(out_req, rc = -EPROTO);
414
415         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
416         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
417
418         oa->o_blksize = cli_brw_size(exp->exp_obd);
419         oa->o_valid |= OBD_MD_FLBLKSZ;
420
421         if (oti != NULL) {
422                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
423                         if (oti->oti_logcookies == NULL)
424                                 oti->oti_logcookies = &oti->oti_onecookie;
425
426                         *oti->oti_logcookies = oa->o_lcookie;
427                 }
428         }
429
430         CDEBUG(D_HA, "transno: "LPD64"\n",
431                lustre_msg_get_transno(req->rq_repmsg));
432 out_req:
433         ptlrpc_req_finished(req);
434 out:
435         RETURN(rc);
436 }
437
438 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
439                    obd_enqueue_update_f upcall, void *cookie,
440                    struct ptlrpc_request_set *rqset)
441 {
442         struct ptlrpc_request   *req;
443         struct osc_setattr_args *sa;
444         struct ost_body         *body;
445         int                      rc;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
449         if (req == NULL)
450                 RETURN(-ENOMEM);
451
452         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(rc);
457         }
458         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
459         ptlrpc_at_set_req_timeout(req);
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
464                              oinfo->oi_oa);
465         osc_pack_capa(req, body, oinfo->oi_capa);
466
467         ptlrpc_request_set_replen(req);
468
469         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
470         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
471         sa = ptlrpc_req_async_args(req);
472         sa->sa_oa     = oinfo->oi_oa;
473         sa->sa_upcall = upcall;
474         sa->sa_cookie = cookie;
475         if (rqset == PTLRPCD_SET)
476                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
477         else
478                 ptlrpc_set_add_req(rqset, req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req,
485                               void *arg, int rc)
486 {
487         struct osc_fsync_args *fa = arg;
488         struct ost_body *body;
489         ENTRY;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
495         if (body == NULL) {
496                 CERROR ("can't unpack ost_body\n");
497                 GOTO(out, rc = -EPROTO);
498         }
499
500         *fa->fa_oi->oi_oa = body->oa;
501 out:
502         rc = fa->fa_upcall(fa->fa_cookie, rc);
503         RETURN(rc);
504 }
505
506 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
507                   obd_enqueue_update_f upcall, void *cookie,
508                   struct ptlrpc_request_set *rqset)
509 {
510         struct ptlrpc_request *req;
511         struct ost_body       *body;
512         struct osc_fsync_args *fa;
513         int                    rc;
514         ENTRY;
515
516         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
517         if (req == NULL)
518                 RETURN(-ENOMEM);
519
520         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(rc);
525         }
526
527         /* overload the size and blocks fields in the oa with start/end */
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
531                              oinfo->oi_oa);
532         osc_pack_capa(req, body, oinfo->oi_capa);
533
534         ptlrpc_request_set_replen(req);
535         req->rq_interpret_reply = osc_sync_interpret;
536
537         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
538         fa = ptlrpc_req_async_args(req);
539         fa->fa_oi = oinfo;
540         fa->fa_upcall = upcall;
541         fa->fa_cookie = cookie;
542
543         if (rqset == PTLRPCD_SET)
544                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
545         else
546                 ptlrpc_set_add_req(rqset, req);
547
548         RETURN (0);
549 }
550
551 /* Find and cancel locally locks matched by @mode in the resource found by
552  * @objid. Found locks are added into @cancel list. Returns the amount of
553  * locks added to @cancels list. */
554 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
555                                    struct list_head *cancels,
556                                    ldlm_mode_t mode, __u64 lock_flags)
557 {
558         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
559         struct ldlm_res_id res_id;
560         struct ldlm_resource *res;
561         int count;
562         ENTRY;
563
564         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
565          * export) but disabled through procfs (flag in NS).
566          *
567          * This distinguishes from a case when ELC is not supported originally,
568          * when we still want to cancel locks in advance and just cancel them
569          * locally, without sending any RPC. */
570         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
571                 RETURN(0);
572
573         ostid_build_res_name(&oa->o_oi, &res_id);
574         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
575         if (IS_ERR(res))
576                 RETURN(0);
577
578         LDLM_RESOURCE_ADDREF(res);
579         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
580                                            lock_flags, 0, NULL);
581         LDLM_RESOURCE_DELREF(res);
582         ldlm_resource_putref(res);
583         RETURN(count);
584 }
585
586 static int osc_destroy_interpret(const struct lu_env *env,
587                                  struct ptlrpc_request *req, void *data,
588                                  int rc)
589 {
590         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
591
592         atomic_dec(&cli->cl_destroy_in_flight);
593         wake_up(&cli->cl_destroy_waitq);
594         return 0;
595 }
596
597 static int osc_can_send_destroy(struct client_obd *cli)
598 {
599         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
600             cli->cl_max_rpcs_in_flight) {
601                 /* The destroy request can be sent */
602                 return 1;
603         }
604         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
605             cli->cl_max_rpcs_in_flight) {
606                 /*
607                  * The counter has been modified between the two atomic
608                  * operations.
609                  */
610                 wake_up(&cli->cl_destroy_waitq);
611         }
612         return 0;
613 }
614
615 /* Destroy requests can be async always on the client, and we don't even really
616  * care about the return code since the client cannot do anything at all about
617  * a destroy failure.
618  * When the MDS is unlinking a filename, it saves the file objects into a
619  * recovery llog, and these object records are cancelled when the OST reports
620  * they were destroyed and sync'd to disk (i.e. transaction committed).
621  * If the client dies, or the OST is down when the object should be destroyed,
622  * the records are not cancelled, and when the OST reconnects to the MDS next,
623  * it will retrieve the llog unlink logs and then sends the log cancellation
624  * cookies to the MDS after committing destroy transactions. */
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa, struct obd_trans_info *oti)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         struct list_head       cancels = LIST_HEAD_INIT(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         osc_set_capa_size(req, &RMF_CAPA1, NULL);
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
661                 oa->o_lcookie = *oti->oti_logcookies;
662         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
663         LASSERT(body);
664         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
665
666         ptlrpc_request_set_replen(req);
667
668         /* If osc_destory is for destroying the unlink orphan,
669          * sent from MDT to OST, which should not be blocked here,
670          * because the process might be triggered by ptlrpcd, and
671          * it is not good to block ptlrpcd thread (b=16006)*/
672         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
673                 req->rq_interpret_reply = osc_destroy_interpret;
674                 if (!osc_can_send_destroy(cli)) {
675                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
676                                                           NULL);
677
678                         /*
679                          * Wait until the number of on-going destroy RPCs drops
680                          * under max_rpc_in_flight
681                          */
682                         l_wait_event_exclusive(cli->cl_destroy_waitq,
683                                                osc_can_send_destroy(cli), &lwi);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         spin_lock(&cli->cl_loi_list_lock);
701         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
702         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
703                      cli->cl_dirty_max_pages)) {
704                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
705                        cli->cl_dirty_pages, cli->cl_dirty_transit,
706                        cli->cl_dirty_max_pages);
707                 oa->o_undirty = 0;
708         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
709                             atomic_long_read(&obd_dirty_transit_pages) >
710                             (obd_max_dirty_pages + 1))) {
711                 /* The atomic_read() allowing the atomic_inc() are
712                  * not covered by a lock thus they may safely race and trip
713                  * this CERROR() unless we add in a small fudge factor (+1). */
714                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
715                        cli->cl_import->imp_obd->obd_name,
716                        atomic_long_read(&obd_dirty_pages),
717                        atomic_long_read(&obd_dirty_transit_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
727                                       PAGE_CACHE_SHIFT) *
728                                      (cli->cl_max_rpcs_in_flight + 1);
729                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
730                                     max_in_flight);
731         }
732         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
733         oa->o_dropped = cli->cl_lost_grant;
734         cli->cl_lost_grant = 0;
735         spin_unlock(&cli->cl_loi_list_lock);
736         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
737                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
738
739 }
740
741 void osc_update_next_shrink(struct client_obd *cli)
742 {
743         cli->cl_next_shrink_grant =
744                 cfs_time_shift(cli->cl_grant_shrink_interval);
745         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
746                cli->cl_next_shrink_grant);
747 }
748
749 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
750 {
751         spin_lock(&cli->cl_loi_list_lock);
752         cli->cl_avail_grant += grant;
753         spin_unlock(&cli->cl_loi_list_lock);
754 }
755
756 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
757 {
758         if (body->oa.o_valid & OBD_MD_FLGRANT) {
759                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
760                 __osc_update_grant(cli, body->oa.o_grant);
761         }
762 }
763
764 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
765                               obd_count keylen, void *key, obd_count vallen,
766                               void *val, struct ptlrpc_request_set *set);
767
768 static int osc_shrink_grant_interpret(const struct lu_env *env,
769                                       struct ptlrpc_request *req,
770                                       void *aa, int rc)
771 {
772         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
773         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
774         struct ost_body *body;
775
776         if (rc != 0) {
777                 __osc_update_grant(cli, oa->o_grant);
778                 GOTO(out, rc);
779         }
780
781         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
782         LASSERT(body);
783         osc_update_grant(cli, body);
784 out:
785         OBDO_FREE(oa);
786         return rc;
787 }
788
789 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
790 {
791         spin_lock(&cli->cl_loi_list_lock);
792         oa->o_grant = cli->cl_avail_grant / 4;
793         cli->cl_avail_grant -= oa->o_grant;
794         spin_unlock(&cli->cl_loi_list_lock);
795         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
796                 oa->o_valid |= OBD_MD_FLFLAGS;
797                 oa->o_flags = 0;
798         }
799         oa->o_flags |= OBD_FL_SHRINK_GRANT;
800         osc_update_next_shrink(cli);
801 }
802
803 /* Shrink the current grant, either from some large amount to enough for a
804  * full set of in-flight RPCs, or if we have already shrunk to that limit
805  * then to enough for a single RPC.  This avoids keeping more grant than
806  * needed, and avoids shrinking the grant piecemeal. */
807 static int osc_shrink_grant(struct client_obd *cli)
808 {
809         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
810                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         if (cli->cl_avail_grant <= target_bytes)
814                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
815         spin_unlock(&cli->cl_loi_list_lock);
816
817         return osc_shrink_grant_to_target(cli, target_bytes);
818 }
819
820 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
821 {
822         int                     rc = 0;
823         struct ost_body        *body;
824         ENTRY;
825
826         spin_lock(&cli->cl_loi_list_lock);
827         /* Don't shrink if we are already above or below the desired limit
828          * We don't want to shrink below a single RPC, as that will negatively
829          * impact block allocation and long-term performance. */
830         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
831                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832
833         if (target_bytes >= cli->cl_avail_grant) {
834                 spin_unlock(&cli->cl_loi_list_lock);
835                 RETURN(0);
836         }
837         spin_unlock(&cli->cl_loi_list_lock);
838
839         OBD_ALLOC_PTR(body);
840         if (!body)
841                 RETURN(-ENOMEM);
842
843         osc_announce_cached(cli, &body->oa, 0);
844
845         spin_lock(&cli->cl_loi_list_lock);
846         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
847         cli->cl_avail_grant = target_bytes;
848         spin_unlock(&cli->cl_loi_list_lock);
849         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
850                 body->oa.o_valid |= OBD_MD_FLFLAGS;
851                 body->oa.o_flags = 0;
852         }
853         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
854         osc_update_next_shrink(cli);
855
856         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
857                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
858                                 sizeof(*body), body, NULL);
859         if (rc != 0)
860                 __osc_update_grant(cli, body->oa.o_grant);
861         OBD_FREE_PTR(body);
862         RETURN(rc);
863 }
864
865 static int osc_should_shrink_grant(struct client_obd *client)
866 {
867         cfs_time_t time = cfs_time_current();
868         cfs_time_t next_shrink = client->cl_next_shrink_grant;
869
870         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
871              OBD_CONNECT_GRANT_SHRINK) == 0)
872                 return 0;
873
874         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
875                 /* Get the current RPC size directly, instead of going via:
876                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
877                  * Keep comment here so that it can be found by searching. */
878                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
879
880                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
881                     client->cl_avail_grant > brw_size)
882                         return 1;
883                 else
884                         osc_update_next_shrink(client);
885         }
886         return 0;
887 }
888
889 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
890 {
891         struct client_obd *client;
892
893         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
894                 if (osc_should_shrink_grant(client))
895                         osc_shrink_grant(client);
896         }
897         return 0;
898 }
899
900 static int osc_add_shrink_grant(struct client_obd *client)
901 {
902         int rc;
903
904         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
905                                        TIMEOUT_GRANT,
906                                        osc_grant_shrink_grant_cb, NULL,
907                                        &client->cl_grant_shrink_list);
908         if (rc) {
909                 CERROR("add grant client %s error %d\n",
910                         client->cl_import->imp_obd->obd_name, rc);
911                 return rc;
912         }
913         CDEBUG(D_CACHE, "add grant client %s \n",
914                client->cl_import->imp_obd->obd_name);
915         osc_update_next_shrink(client);
916         return 0;
917 }
918
919 static int osc_del_shrink_grant(struct client_obd *client)
920 {
921         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
922                                          TIMEOUT_GRANT);
923 }
924
925 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
926 {
927         /*
928          * ocd_grant is the total grant amount we're expect to hold: if we've
929          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
930          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
931          * dirty.
932          *
933          * race is tolerable here: if we're evicted, but imp_state already
934          * left EVICTED state, then cl_dirty_pages must be 0 already.
935          */
936         spin_lock(&cli->cl_loi_list_lock);
937         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
938                 cli->cl_avail_grant = ocd->ocd_grant;
939         else
940                 cli->cl_avail_grant = ocd->ocd_grant -
941                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
942
943         if (cli->cl_avail_grant < 0) {
944                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
945                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
946                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
947                 /* workaround for servers which do not have the patch from
948                  * LU-2679 */
949                 cli->cl_avail_grant = ocd->ocd_grant;
950         }
951
952         /* determine the appropriate chunk size used by osc_extent. */
953         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
954         spin_unlock(&cli->cl_loi_list_lock);
955
956         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
957                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
958                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, obd_count page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~CFS_PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            obd_count page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1061                                    struct brw_page **pga, int opc,
1062                                    cksum_type_t cksum_type)
1063 {
1064         __u32                           cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         int                             err;
1069         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1070
1071         LASSERT(pg_count > 0);
1072
1073         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1074         if (IS_ERR(hdesc)) {
1075                 CERROR("Unable to initialize checksum hash %s\n",
1076                        cfs_crypto_hash_name(cfs_alg));
1077                 return PTR_ERR(hdesc);
1078         }
1079
1080         while (nob > 0 && pg_count > 0) {
1081                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1082
1083                 /* corrupt the data before we compute the checksum, to
1084                  * simulate an OST->client data error */
1085                 if (i == 0 && opc == OST_READ &&
1086                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1087                         unsigned char *ptr = kmap(pga[i]->pg);
1088                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1089
1090                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1091                         kunmap(pga[i]->pg);
1092                 }
1093                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1094                                             pga[i]->off & ~CFS_PAGE_MASK,
1095                                             count);
1096                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1097                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1098
1099                 nob -= pga[i]->count;
1100                 pg_count--;
1101                 i++;
1102         }
1103
1104         bufsize = sizeof(cksum);
1105         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1106
1107         /* For sending we only compute the wrong checksum instead
1108          * of corrupting the data so it is still correct on a redo */
1109         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1110                 cksum++;
1111
1112         return cksum;
1113 }
1114
1115 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1116                                 struct lov_stripe_md *lsm, obd_count page_count,
1117                                 struct brw_page **pga,
1118                                 struct ptlrpc_request **reqp,
1119                                 struct obd_capa *ocapa, int reserve,
1120                                 int resend)
1121 {
1122         struct ptlrpc_request   *req;
1123         struct ptlrpc_bulk_desc *desc;
1124         struct ost_body         *body;
1125         struct obd_ioobj        *ioobj;
1126         struct niobuf_remote    *niobuf;
1127         int niocount, i, requested_nob, opc, rc;
1128         struct osc_brw_async_args *aa;
1129         struct req_capsule      *pill;
1130         struct brw_page *pg_prev;
1131
1132         ENTRY;
1133         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1134                 RETURN(-ENOMEM); /* Recoverable */
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1136                 RETURN(-EINVAL); /* Fatal */
1137
1138         if ((cmd & OBD_BRW_WRITE) != 0) {
1139                 opc = OST_WRITE;
1140                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1141                                                 cli->cl_import->imp_rq_pool,
1142                                                 &RQF_OST_BRW_WRITE);
1143         } else {
1144                 opc = OST_READ;
1145                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1146         }
1147         if (req == NULL)
1148                 RETURN(-ENOMEM);
1149
1150         for (niocount = i = 1; i < page_count; i++) {
1151                 if (!can_merge_pages(pga[i - 1], pga[i]))
1152                         niocount++;
1153         }
1154
1155         pill = &req->rq_pill;
1156         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1157                              sizeof(*ioobj));
1158         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1159                              niocount * sizeof(*niobuf));
1160         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1161
1162         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1163         if (rc) {
1164                 ptlrpc_request_free(req);
1165                 RETURN(rc);
1166         }
1167         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1168         ptlrpc_at_set_req_timeout(req);
1169         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1170          * retry logic */
1171         req->rq_no_retry_einprogress = 1;
1172
1173         desc = ptlrpc_prep_bulk_imp(req, page_count,
1174                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1175                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1176                 OST_BULK_PORTAL);
1177
1178         if (desc == NULL)
1179                 GOTO(out, rc = -ENOMEM);
1180         /* NB request now owns desc and will free it when it gets freed */
1181
1182         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1183         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1184         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1185         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1186
1187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1188
1189         obdo_to_ioobj(oa, ioobj);
1190         ioobj->ioo_bufcnt = niocount;
1191         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1192          * that might be send for this request.  The actual number is decided
1193          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1194          * "max - 1" for old client compatibility sending "0", and also so the
1195          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1196         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1197         osc_pack_capa(req, body, ocapa);
1198         LASSERT(page_count > 0);
1199         pg_prev = pga[0];
1200         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1201                 struct brw_page *pg = pga[i];
1202                 int poff = pg->off & ~CFS_PAGE_MASK;
1203
1204                 LASSERT(pg->count > 0);
1205                 /* make sure there is no gap in the middle of page array */
1206                 LASSERTF(page_count == 1 ||
1207                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1208                           ergo(i > 0 && i < page_count - 1,
1209                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1210                           ergo(i == page_count - 1, poff == 0)),
1211                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1212                          i, page_count, pg, pg->off, pg->count);
1213                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1216                          i, page_count,
1217                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218                          pg_prev->pg, page_private(pg_prev->pg),
1219                          pg_prev->pg->index, pg_prev->off);
1220                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1221                         (pg->flag & OBD_BRW_SRVLOCK));
1222
1223                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1224                 requested_nob += pg->count;
1225
1226                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1227                         niobuf--;
1228                         niobuf->rnb_len += pg->count;
1229                 } else {
1230                         niobuf->rnb_offset = pg->off;
1231                         niobuf->rnb_len    = pg->count;
1232                         niobuf->rnb_flags  = pg->flag;
1233                 }
1234                 pg_prev = pg;
1235         }
1236
1237         LASSERTF((void *)(niobuf - niocount) ==
1238                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1239                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1240                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1241
1242         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1243         if (resend) {
1244                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1245                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1246                         body->oa.o_flags = 0;
1247                 }
1248                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1249         }
1250
1251         if (osc_should_shrink_grant(cli))
1252                 osc_shrink_grant_local(cli, &body->oa);
1253
1254         /* size[REQ_REC_OFF] still sizeof (*body) */
1255         if (opc == OST_WRITE) {
1256                 if (cli->cl_checksum &&
1257                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1258                         /* store cl_cksum_type in a local variable since
1259                          * it can be changed via lprocfs */
1260                         cksum_type_t cksum_type = cli->cl_cksum_type;
1261
1262                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1263                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1264                                 body->oa.o_flags = 0;
1265                         }
1266                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1267                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1268                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1269                                                              page_count, pga,
1270                                                              OST_WRITE,
1271                                                              cksum_type);
1272                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1273                                body->oa.o_cksum);
1274                         /* save this in 'oa', too, for later checking */
1275                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         oa->o_flags |= cksum_type_pack(cksum_type);
1277                 } else {
1278                         /* clear out the checksum flag, in case this is a
1279                          * resend but cl_checksum is no longer set. b=11238 */
1280                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1281                 }
1282                 oa->o_cksum = body->oa.o_cksum;
1283                 /* 1 RC per niobuf */
1284                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1285                                      sizeof(__u32) * niocount);
1286         } else {
1287                 if (cli->cl_checksum &&
1288                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1289                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1290                                 body->oa.o_flags = 0;
1291                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1292                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1293                 }
1294         }
1295         ptlrpc_request_set_replen(req);
1296
1297         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298         aa = ptlrpc_req_async_args(req);
1299         aa->aa_oa = oa;
1300         aa->aa_requested_nob = requested_nob;
1301         aa->aa_nio_count = niocount;
1302         aa->aa_page_count = page_count;
1303         aa->aa_resends = 0;
1304         aa->aa_ppga = pga;
1305         aa->aa_cli = cli;
1306         INIT_LIST_HEAD(&aa->aa_oaps);
1307         if (ocapa && reserve)
1308                 aa->aa_ocapa = capa_get(ocapa);
1309
1310         *reqp = req;
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1315         RETURN(0);
1316
1317  out:
1318         ptlrpc_req_finished(req);
1319         RETURN(rc);
1320 }
1321
1322 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1323                                 __u32 client_cksum, __u32 server_cksum, int nob,
1324                                 obd_count page_count, struct brw_page **pga,
1325                                 cksum_type_t client_cksum_type)
1326 {
1327         __u32 new_cksum;
1328         char *msg;
1329         cksum_type_t cksum_type;
1330
1331         if (server_cksum == client_cksum) {
1332                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1333                 return 0;
1334         }
1335
1336         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1337                                        oa->o_flags : 0);
1338         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1339                                       cksum_type);
1340
1341         if (cksum_type != client_cksum_type)
1342                 msg = "the server did not use the checksum type specified in "
1343                       "the original request - likely a protocol problem";
1344         else if (new_cksum == server_cksum)
1345                 msg = "changed on the client after we checksummed it - "
1346                       "likely false positive due to mmap IO (bug 11742)";
1347         else if (new_cksum == client_cksum)
1348                 msg = "changed in transit before arrival at OST";
1349         else
1350                 msg = "changed in transit AND doesn't match the original - "
1351                       "likely false positive due to mmap IO (bug 11742)";
1352
1353         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1354                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1355                            msg, libcfs_nid2str(peer->nid),
1356                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1359                            POSTID(&oa->o_oi), pga[0]->off,
1360                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362                "client csum now %x\n", client_cksum, client_cksum_type,
1363                server_cksum, cksum_type, new_cksum);
1364         return 1;
1365 }
1366
1367 /* Note rc enters this function as number of bytes transferred */
1368 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1369 {
1370         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1371         const lnet_process_id_t *peer =
1372                         &req->rq_import->imp_connection->c_peer;
1373         struct client_obd *cli = aa->aa_cli;
1374         struct ost_body *body;
1375         u32 client_cksum = 0;
1376         ENTRY;
1377
1378         if (rc < 0 && rc != -EDQUOT) {
1379                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1380                 RETURN(rc);
1381         }
1382
1383         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1384         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1385         if (body == NULL) {
1386                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1387                 RETURN(-EPROTO);
1388         }
1389
1390         /* set/clear over quota flag for a uid/gid */
1391         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1393                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1394
1395                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1396                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1397                        body->oa.o_flags);
1398                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1399         }
1400
1401         osc_update_grant(cli, body);
1402
1403         if (rc < 0)
1404                 RETURN(rc);
1405
1406         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1407                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1408
1409         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1410                 if (rc > 0) {
1411                         CERROR("Unexpected +ve rc %d\n", rc);
1412                         RETURN(-EPROTO);
1413                 }
1414                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1415
1416                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1417                         RETURN(-EAGAIN);
1418
1419                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1420                     check_write_checksum(&body->oa, peer, client_cksum,
1421                                          body->oa.o_cksum, aa->aa_requested_nob,
1422                                          aa->aa_page_count, aa->aa_ppga,
1423                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1424                         RETURN(-EAGAIN);
1425
1426                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1427                                      aa->aa_page_count, aa->aa_ppga);
1428                 GOTO(out, rc);
1429         }
1430
1431         /* The rest of this function executes only for OST_READs */
1432
1433         /* if unwrap_bulk failed, return -EAGAIN to retry */
1434         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1435         if (rc < 0)
1436                 GOTO(out, rc = -EAGAIN);
1437
1438         if (rc > aa->aa_requested_nob) {
1439                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1440                        aa->aa_requested_nob);
1441                 RETURN(-EPROTO);
1442         }
1443
1444         if (rc != req->rq_bulk->bd_nob_transferred) {
1445                 CERROR ("Unexpected rc %d (%d transferred)\n",
1446                         rc, req->rq_bulk->bd_nob_transferred);
1447                 return (-EPROTO);
1448         }
1449
1450         if (rc < aa->aa_requested_nob)
1451                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1452
1453         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1454                 static int cksum_counter;
1455                 u32        server_cksum = body->oa.o_cksum;
1456                 char      *via;
1457                 char      *router;
1458                 cksum_type_t cksum_type;
1459
1460                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1461                                                body->oa.o_flags : 0);
1462                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1463                                                  aa->aa_ppga, OST_READ,
1464                                                  cksum_type);
1465
1466                 if (peer->nid == req->rq_bulk->bd_sender) {
1467                         via = router = "";
1468                 } else {
1469                         via = " via ";
1470                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1471                 }
1472
1473                 if (server_cksum != client_cksum) {
1474                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475                                            "%s%s%s inode "DFID" object "DOSTID
1476                                            " extent ["LPU64"-"LPU64"]\n",
1477                                            req->rq_import->imp_obd->obd_name,
1478                                            libcfs_nid2str(peer->nid),
1479                                            via, router,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_seq : (__u64)0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_oid : 0,
1484                                            body->oa.o_valid & OBD_MD_FLFID ?
1485                                                 body->oa.o_parent_ver : 0,
1486                                            POSTID(&body->oa.o_oi),
1487                                            aa->aa_ppga[0]->off,
1488                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1489                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1490                                                                         1);
1491                         CERROR("client %x, server %x, cksum_type %x\n",
1492                                client_cksum, server_cksum, cksum_type);
1493                         cksum_counter = 0;
1494                         aa->aa_oa->o_cksum = client_cksum;
1495                         rc = -EAGAIN;
1496                 } else {
1497                         cksum_counter++;
1498                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1499                         rc = 0;
1500                 }
1501         } else if (unlikely(client_cksum)) {
1502                 static int cksum_missed;
1503
1504                 cksum_missed++;
1505                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506                         CERROR("Checksum %u requested from %s but not sent\n",
1507                                cksum_missed, libcfs_nid2str(peer->nid));
1508         } else {
1509                 rc = 0;
1510         }
1511 out:
1512         if (rc >= 0)
1513                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514                                      aa->aa_oa, &body->oa);
1515
1516         RETURN(rc);
1517 }
1518
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520                                 struct osc_brw_async_args *aa, int rc)
1521 {
1522         struct ptlrpc_request *new_req;
1523         struct osc_brw_async_args *new_aa;
1524         struct osc_async_page *oap;
1525         ENTRY;
1526
1527         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528                   "redo for recoverable error %d", rc);
1529
1530         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1532                                   aa->aa_cli, aa->aa_oa,
1533                                   NULL /* lsm unused by osc currently */,
1534                                   aa->aa_page_count, aa->aa_ppga,
1535                                   &new_req, aa->aa_ocapa, 0, 1);
1536         if (rc)
1537                 RETURN(rc);
1538
1539         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1540                 if (oap->oap_request != NULL) {
1541                         LASSERTF(request == oap->oap_request,
1542                                  "request %p != oap_request %p\n",
1543                                  request, oap->oap_request);
1544                         if (oap->oap_interrupted) {
1545                                 ptlrpc_req_finished(new_req);
1546                                 RETURN(-EINTR);
1547                         }
1548                 }
1549         }
1550         /* New request takes over pga and oaps from old request.
1551          * Note that copying a list_head doesn't work, need to move it... */
1552         aa->aa_resends++;
1553         new_req->rq_interpret_reply = request->rq_interpret_reply;
1554         new_req->rq_async_args = request->rq_async_args;
1555         new_req->rq_commit_cb = request->rq_commit_cb;
1556         /* cap resend delay to the current request timeout, this is similar to
1557          * what ptlrpc does (see after_reply()) */
1558         if (aa->aa_resends > new_req->rq_timeout)
1559                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1560         else
1561                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1562         new_req->rq_generation_set = 1;
1563         new_req->rq_import_generation = request->rq_import_generation;
1564
1565         new_aa = ptlrpc_req_async_args(new_req);
1566
1567         INIT_LIST_HEAD(&new_aa->aa_oaps);
1568         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1569         INIT_LIST_HEAD(&new_aa->aa_exts);
1570         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1571         new_aa->aa_resends = aa->aa_resends;
1572
1573         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1574                 if (oap->oap_request) {
1575                         ptlrpc_req_finished(oap->oap_request);
1576                         oap->oap_request = ptlrpc_request_addref(new_req);
1577                 }
1578         }
1579
1580         new_aa->aa_ocapa = aa->aa_ocapa;
1581         aa->aa_ocapa = NULL;
1582
1583         /* XXX: This code will run into problem if we're going to support
1584          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1585          * and wait for all of them to be finished. We should inherit request
1586          * set from old request. */
1587         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1588
1589         DEBUG_REQ(D_INFO, new_req, "new request");
1590         RETURN(0);
1591 }
1592
1593 /*
1594  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1595  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1596  * fine for our small page arrays and doesn't require allocation.  its an
1597  * insertion sort that swaps elements that are strides apart, shrinking the
1598  * stride down until its '1' and the array is sorted.
1599  */
1600 static void sort_brw_pages(struct brw_page **array, int num)
1601 {
1602         int stride, i, j;
1603         struct brw_page *tmp;
1604
1605         if (num == 1)
1606                 return;
1607         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1608                 ;
1609
1610         do {
1611                 stride /= 3;
1612                 for (i = stride ; i < num ; i++) {
1613                         tmp = array[i];
1614                         j = i;
1615                         while (j >= stride && array[j - stride]->off > tmp->off) {
1616                                 array[j] = array[j - stride];
1617                                 j -= stride;
1618                         }
1619                         array[j] = tmp;
1620                 }
1621         } while (stride > 1);
1622 }
1623
1624 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1625 {
1626         LASSERT(ppga != NULL);
1627         OBD_FREE(ppga, sizeof(*ppga) * count);
1628 }
1629
1630 static int brw_interpret(const struct lu_env *env,
1631                          struct ptlrpc_request *req, void *data, int rc)
1632 {
1633         struct osc_brw_async_args *aa = data;
1634         struct osc_extent *ext;
1635         struct osc_extent *tmp;
1636         struct client_obd *cli = aa->aa_cli;
1637         ENTRY;
1638
1639         rc = osc_brw_fini_request(req, rc);
1640         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1641         /* When server return -EINPROGRESS, client should always retry
1642          * regardless of the number of times the bulk was resent already. */
1643         if (osc_recoverable_error(rc)) {
1644                 if (req->rq_import_generation !=
1645                     req->rq_import->imp_generation) {
1646                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1647                                ""DOSTID", rc = %d.\n",
1648                                req->rq_import->imp_obd->obd_name,
1649                                POSTID(&aa->aa_oa->o_oi), rc);
1650                 } else if (rc == -EINPROGRESS ||
1651                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1652                         rc = osc_brw_redo_request(req, aa, rc);
1653                 } else {
1654                         CERROR("%s: too many resent retries for object: "
1655                                ""LPU64":"LPU64", rc = %d.\n",
1656                                req->rq_import->imp_obd->obd_name,
1657                                POSTID(&aa->aa_oa->o_oi), rc);
1658                 }
1659
1660                 if (rc == 0)
1661                         RETURN(0);
1662                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1663                         rc = -EIO;
1664         }
1665
1666         if (aa->aa_ocapa) {
1667                 capa_put(aa->aa_ocapa);
1668                 aa->aa_ocapa = NULL;
1669         }
1670
1671         if (rc == 0) {
1672                 struct obdo *oa = aa->aa_oa;
1673                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1674                 unsigned long valid = 0;
1675                 struct cl_object *obj;
1676                 struct osc_async_page *last;
1677
1678                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1679                 obj = osc2cl(last->oap_obj);
1680
1681                 cl_object_attr_lock(obj);
1682                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1683                         attr->cat_blocks = oa->o_blocks;
1684                         valid |= CAT_BLOCKS;
1685                 }
1686                 if (oa->o_valid & OBD_MD_FLMTIME) {
1687                         attr->cat_mtime = oa->o_mtime;
1688                         valid |= CAT_MTIME;
1689                 }
1690                 if (oa->o_valid & OBD_MD_FLATIME) {
1691                         attr->cat_atime = oa->o_atime;
1692                         valid |= CAT_ATIME;
1693                 }
1694                 if (oa->o_valid & OBD_MD_FLCTIME) {
1695                         attr->cat_ctime = oa->o_ctime;
1696                         valid |= CAT_CTIME;
1697                 }
1698
1699                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1700                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1701                         loff_t last_off = last->oap_count + last->oap_obj_off +
1702                                 last->oap_page_off;
1703
1704                         /* Change file size if this is an out of quota or
1705                          * direct IO write and it extends the file size */
1706                         if (loi->loi_lvb.lvb_size < last_off) {
1707                                 attr->cat_size = last_off;
1708                                 valid |= CAT_SIZE;
1709                         }
1710                         /* Extend KMS if it's not a lockless write */
1711                         if (loi->loi_kms < last_off &&
1712                             oap2osc_page(last)->ops_srvlock == 0) {
1713                                 attr->cat_kms = last_off;
1714                                 valid |= CAT_KMS;
1715                         }
1716                 }
1717
1718                 if (valid != 0)
1719                         cl_object_attr_update(env, obj, attr, valid);
1720                 cl_object_attr_unlock(obj);
1721         }
1722         OBDO_FREE(aa->aa_oa);
1723
1724         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1725                 osc_inc_unstable_pages(req);
1726
1727         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1728                 list_del_init(&ext->oe_link);
1729                 osc_extent_finish(env, ext, 1, rc);
1730         }
1731         LASSERT(list_empty(&aa->aa_exts));
1732         LASSERT(list_empty(&aa->aa_oaps));
1733
1734         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1735                           req->rq_bulk->bd_nob_transferred);
1736         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1737         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1738
1739         spin_lock(&cli->cl_loi_list_lock);
1740         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1741          * is called so we know whether to go to sync BRWs or wait for more
1742          * RPCs to complete */
1743         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1744                 cli->cl_w_in_flight--;
1745         else
1746                 cli->cl_r_in_flight--;
1747         osc_wake_cache_waiters(cli);
1748         spin_unlock(&cli->cl_loi_list_lock);
1749
1750         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1751         RETURN(rc);
1752 }
1753
1754 static void brw_commit(struct ptlrpc_request *req)
1755 {
1756         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1757          * this called via the rq_commit_cb, I need to ensure
1758          * osc_dec_unstable_pages is still called. Otherwise unstable
1759          * pages may be leaked. */
1760         spin_lock(&req->rq_lock);
1761         if (likely(req->rq_unstable)) {
1762                 req->rq_unstable = 0;
1763                 spin_unlock(&req->rq_lock);
1764
1765                 osc_dec_unstable_pages(req);
1766         } else {
1767                 req->rq_committed = 1;
1768                 spin_unlock(&req->rq_lock);
1769         }
1770 }
1771
1772 /**
1773  * Build an RPC by the list of extent @ext_list. The caller must ensure
1774  * that the total pages in this list are NOT over max pages per RPC.
1775  * Extents in the list must be in OES_RPC state.
1776  */
1777 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1778                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1779 {
1780         struct ptlrpc_request           *req = NULL;
1781         struct osc_extent               *ext;
1782         struct brw_page                 **pga = NULL;
1783         struct osc_brw_async_args       *aa = NULL;
1784         struct obdo                     *oa = NULL;
1785         struct osc_async_page           *oap;
1786         struct osc_async_page           *tmp;
1787         struct cl_req                   *clerq = NULL;
1788         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1789                                                                       CRT_READ;
1790         struct cl_req_attr              *crattr = NULL;
1791         obd_off                         starting_offset = OBD_OBJECT_EOF;
1792         obd_off                         ending_offset = 0;
1793         int                             mpflag = 0;
1794         int                             mem_tight = 0;
1795         int                             page_count = 0;
1796         bool                            soft_sync = false;
1797         int                             i;
1798         int                             rc;
1799         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1800         struct ost_body                 *body;
1801         ENTRY;
1802         LASSERT(!list_empty(ext_list));
1803
1804         /* add pages into rpc_list to build BRW rpc */
1805         list_for_each_entry(ext, ext_list, oe_link) {
1806                 LASSERT(ext->oe_state == OES_RPC);
1807                 mem_tight |= ext->oe_memalloc;
1808                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1809                         ++page_count;
1810                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1811                         if (starting_offset > oap->oap_obj_off)
1812                                 starting_offset = oap->oap_obj_off;
1813                         else
1814                                 LASSERT(oap->oap_page_off == 0);
1815                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1816                                 ending_offset = oap->oap_obj_off +
1817                                                 oap->oap_count;
1818                         else
1819                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1820                                         PAGE_CACHE_SIZE);
1821                 }
1822         }
1823
1824         soft_sync = osc_over_unstable_soft_limit(cli);
1825         if (mem_tight)
1826                 mpflag = cfs_memory_pressure_get_and_set();
1827
1828         OBD_ALLOC(crattr, sizeof(*crattr));
1829         if (crattr == NULL)
1830                 GOTO(out, rc = -ENOMEM);
1831
1832         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1833         if (pga == NULL)
1834                 GOTO(out, rc = -ENOMEM);
1835
1836         OBDO_ALLOC(oa);
1837         if (oa == NULL)
1838                 GOTO(out, rc = -ENOMEM);
1839
1840         i = 0;
1841         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1842                 struct cl_page *page = oap2cl_page(oap);
1843                 if (clerq == NULL) {
1844                         clerq = cl_req_alloc(env, page, crt,
1845                                              1 /* only 1-object rpcs for now */);
1846                         if (IS_ERR(clerq))
1847                                 GOTO(out, rc = PTR_ERR(clerq));
1848                 }
1849                 if (mem_tight)
1850                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1851                 if (soft_sync)
1852                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1853                 pga[i] = &oap->oap_brw_page;
1854                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1855                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1856                        pga[i]->pg, page_index(oap->oap_page), oap,
1857                        pga[i]->flag);
1858                 i++;
1859                 cl_req_page_add(env, clerq, page);
1860         }
1861
1862         /* always get the data for the obdo for the rpc */
1863         LASSERT(clerq != NULL);
1864         crattr->cra_oa = oa;
1865         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1866
1867         rc = cl_req_prep(env, clerq);
1868         if (rc != 0) {
1869                 CERROR("cl_req_prep failed: %d\n", rc);
1870                 GOTO(out, rc);
1871         }
1872
1873         sort_brw_pages(pga, page_count);
1874         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1875                         pga, &req, crattr->cra_capa, 1, 0);
1876         if (rc != 0) {
1877                 CERROR("prep_req failed: %d\n", rc);
1878                 GOTO(out, rc);
1879         }
1880
1881         req->rq_commit_cb = brw_commit;
1882         req->rq_interpret_reply = brw_interpret;
1883
1884         if (mem_tight != 0)
1885                 req->rq_memalloc = 1;
1886
1887         /* Need to update the timestamps after the request is built in case
1888          * we race with setattr (locally or in queue at OST).  If OST gets
1889          * later setattr before earlier BRW (as determined by the request xid),
1890          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1891          * way to do this in a single call.  bug 10150 */
1892         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1893         crattr->cra_oa = &body->oa;
1894         cl_req_attr_set(env, clerq, crattr,
1895                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1896
1897         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1898
1899         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1900         aa = ptlrpc_req_async_args(req);
1901         INIT_LIST_HEAD(&aa->aa_oaps);
1902         list_splice_init(&rpc_list, &aa->aa_oaps);
1903         INIT_LIST_HEAD(&aa->aa_exts);
1904         list_splice_init(ext_list, &aa->aa_exts);
1905         aa->aa_clerq = clerq;
1906
1907         /* queued sync pages can be torn down while the pages
1908          * were between the pending list and the rpc */
1909         tmp = NULL;
1910         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1911                 /* only one oap gets a request reference */
1912                 if (tmp == NULL)
1913                         tmp = oap;
1914                 if (oap->oap_interrupted && !req->rq_intr) {
1915                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1916                                         oap, req);
1917                         ptlrpc_mark_interrupted(req);
1918                 }
1919         }
1920         if (tmp != NULL)
1921                 tmp->oap_request = ptlrpc_request_addref(req);
1922
1923         spin_lock(&cli->cl_loi_list_lock);
1924         starting_offset >>= PAGE_CACHE_SHIFT;
1925         if (cmd == OBD_BRW_READ) {
1926                 cli->cl_r_in_flight++;
1927                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1928                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1929                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1930                                       starting_offset + 1);
1931         } else {
1932                 cli->cl_w_in_flight++;
1933                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1934                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1935                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1936                                       starting_offset + 1);
1937         }
1938         spin_unlock(&cli->cl_loi_list_lock);
1939
1940         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1941                   page_count, aa, cli->cl_r_in_flight,
1942                   cli->cl_w_in_flight);
1943
1944         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1945          * see which CPU/NUMA node the majority of pages were allocated
1946          * on, and try to assign the async RPC to the CPU core
1947          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1948          *
1949          * But on the other hand, we expect that multiple ptlrpcd
1950          * threads and the initial write sponsor can run in parallel,
1951          * especially when data checksum is enabled, which is CPU-bound
1952          * operation and single ptlrpcd thread cannot process in time.
1953          * So more ptlrpcd threads sharing BRW load
1954          * (with PDL_POLICY_ROUND) seems better.
1955          */
1956         ptlrpcd_add_req(req, pol, -1);
1957         rc = 0;
1958         EXIT;
1959
1960 out:
1961         if (mem_tight != 0)
1962                 cfs_memory_pressure_restore(mpflag);
1963
1964         if (crattr != NULL) {
1965                 capa_put(crattr->cra_capa);
1966                 OBD_FREE(crattr, sizeof(*crattr));
1967         }
1968
1969         if (rc != 0) {
1970                 LASSERT(req == NULL);
1971
1972                 if (oa)
1973                         OBDO_FREE(oa);
1974                 if (pga)
1975                         OBD_FREE(pga, sizeof(*pga) * page_count);
1976                 /* this should happen rarely and is pretty bad, it makes the
1977                  * pending list not follow the dirty order */
1978                 while (!list_empty(ext_list)) {
1979                         ext = list_entry(ext_list->next, struct osc_extent,
1980                                          oe_link);
1981                         list_del_init(&ext->oe_link);
1982                         osc_extent_finish(env, ext, 0, rc);
1983                 }
1984                 if (clerq && !IS_ERR(clerq))
1985                         cl_req_completion(env, clerq, rc);
1986         }
1987         RETURN(rc);
1988 }
1989
1990 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1991                                         struct ldlm_enqueue_info *einfo)
1992 {
1993         void *data = einfo->ei_cbdata;
1994         int set = 0;
1995
1996         LASSERT(lock != NULL);
1997         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1998         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1999         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2000         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2001
2002         lock_res_and_lock(lock);
2003
2004         if (lock->l_ast_data == NULL)
2005                 lock->l_ast_data = data;
2006         if (lock->l_ast_data == data)
2007                 set = 1;
2008
2009         unlock_res_and_lock(lock);
2010
2011         return set;
2012 }
2013
2014 static int osc_set_data_with_check(struct lustre_handle *lockh,
2015                                    struct ldlm_enqueue_info *einfo)
2016 {
2017         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2018         int set = 0;
2019
2020         if (lock != NULL) {
2021                 set = osc_set_lock_data_with_check(lock, einfo);
2022                 LDLM_LOCK_PUT(lock);
2023         } else
2024                 CERROR("lockh %p, data %p - client evicted?\n",
2025                        lockh, einfo->ei_cbdata);
2026         return set;
2027 }
2028
2029 static int osc_enqueue_fini(struct ptlrpc_request *req,
2030                             osc_enqueue_upcall_f upcall, void *cookie,
2031                             struct lustre_handle *lockh, ldlm_mode_t mode,
2032                             __u64 *flags, int agl, int errcode)
2033 {
2034         bool intent = *flags & LDLM_FL_HAS_INTENT;
2035         int rc;
2036         ENTRY;
2037
2038         /* The request was created before ldlm_cli_enqueue call. */
2039         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2040                 struct ldlm_reply *rep;
2041
2042                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2043                 LASSERT(rep != NULL);
2044
2045                 rep->lock_policy_res1 =
2046                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2047                 if (rep->lock_policy_res1)
2048                         errcode = rep->lock_policy_res1;
2049                 if (!agl)
2050                         *flags |= LDLM_FL_LVB_READY;
2051         } else if (errcode == ELDLM_OK) {
2052                 *flags |= LDLM_FL_LVB_READY;
2053         }
2054
2055         /* Call the update callback. */
2056         rc = (*upcall)(cookie, lockh, errcode);
2057
2058         /* release the reference taken in ldlm_cli_enqueue() */
2059         if (errcode == ELDLM_LOCK_MATCHED)
2060                 errcode = ELDLM_OK;
2061         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2062                 ldlm_lock_decref(lockh, mode);
2063
2064         RETURN(rc);
2065 }
2066
2067 static int osc_enqueue_interpret(const struct lu_env *env,
2068                                  struct ptlrpc_request *req,
2069                                  struct osc_enqueue_args *aa, int rc)
2070 {
2071         struct ldlm_lock *lock;
2072         struct lustre_handle *lockh = &aa->oa_lockh;
2073         ldlm_mode_t mode = aa->oa_mode;
2074         struct ost_lvb *lvb = aa->oa_lvb;
2075         __u32 lvb_len = sizeof(*lvb);
2076         __u64 flags = 0;
2077
2078         ENTRY;
2079
2080         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2081          * be valid. */
2082         lock = ldlm_handle2lock(lockh);
2083         LASSERTF(lock != NULL,
2084                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2085                  lockh->cookie, req, aa);
2086
2087         /* Take an additional reference so that a blocking AST that
2088          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2089          * to arrive after an upcall has been executed by
2090          * osc_enqueue_fini(). */
2091         ldlm_lock_addref(lockh, mode);
2092
2093         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2094         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2095
2096         /* Let CP AST to grant the lock first. */
2097         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2098
2099         if (aa->oa_agl) {
2100                 LASSERT(aa->oa_lvb == NULL);
2101                 LASSERT(aa->oa_flags == NULL);
2102                 aa->oa_flags = &flags;
2103         }
2104
2105         /* Complete obtaining the lock procedure. */
2106         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2107                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2108                                    lockh, rc);
2109         /* Complete osc stuff. */
2110         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2111                               aa->oa_flags, aa->oa_agl, rc);
2112
2113         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2114
2115         ldlm_lock_decref(lockh, mode);
2116         LDLM_LOCK_PUT(lock);
2117         RETURN(rc);
2118 }
2119
2120 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2121
2122 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2123  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2124  * other synchronous requests, however keeping some locks and trying to obtain
2125  * others may take a considerable amount of time in a case of ost failure; and
2126  * when other sync requests do not get released lock from a client, the client
2127  * is evicted from the cluster -- such scenarious make the life difficult, so
2128  * release locks just after they are obtained. */
2129 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2130                      __u64 *flags, ldlm_policy_data_t *policy,
2131                      struct ost_lvb *lvb, int kms_valid,
2132                      osc_enqueue_upcall_f upcall, void *cookie,
2133                      struct ldlm_enqueue_info *einfo,
2134                      struct ptlrpc_request_set *rqset, int async, int agl)
2135 {
2136         struct obd_device *obd = exp->exp_obd;
2137         struct lustre_handle lockh = { 0 };
2138         struct ptlrpc_request *req = NULL;
2139         int intent = *flags & LDLM_FL_HAS_INTENT;
2140         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2141         ldlm_mode_t mode;
2142         int rc;
2143         ENTRY;
2144
2145         /* Filesystem lock extents are extended to page boundaries so that
2146          * dealing with the page cache is a little smoother.  */
2147         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2148         policy->l_extent.end |= ~CFS_PAGE_MASK;
2149
2150         /*
2151          * kms is not valid when either object is completely fresh (so that no
2152          * locks are cached), or object was evicted. In the latter case cached
2153          * lock cannot be used, because it would prime inode state with
2154          * potentially stale LVB.
2155          */
2156         if (!kms_valid)
2157                 goto no_match;
2158
2159         /* Next, search for already existing extent locks that will cover us */
2160         /* If we're trying to read, we also search for an existing PW lock.  The
2161          * VFS and page cache already protect us locally, so lots of readers/
2162          * writers can share a single PW lock.
2163          *
2164          * There are problems with conversion deadlocks, so instead of
2165          * converting a read lock to a write lock, we'll just enqueue a new
2166          * one.
2167          *
2168          * At some point we should cancel the read lock instead of making them
2169          * send us a blocking callback, but there are problems with canceling
2170          * locks out from other users right now, too. */
2171         mode = einfo->ei_mode;
2172         if (einfo->ei_mode == LCK_PR)
2173                 mode |= LCK_PW;
2174         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2175                                einfo->ei_type, policy, mode, &lockh, 0);
2176         if (mode) {
2177                 struct ldlm_lock *matched;
2178
2179                 if (*flags & LDLM_FL_TEST_LOCK)
2180                         RETURN(ELDLM_OK);
2181
2182                 matched = ldlm_handle2lock(&lockh);
2183                 if (agl) {
2184                         /* AGL enqueues DLM locks speculatively. Therefore if
2185                          * it already exists a DLM lock, it wll just inform the
2186                          * caller to cancel the AGL process for this stripe. */
2187                         ldlm_lock_decref(&lockh, mode);
2188                         LDLM_LOCK_PUT(matched);
2189                         RETURN(-ECANCELED);
2190                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2191                         *flags |= LDLM_FL_LVB_READY;
2192
2193                         /* We already have a lock, and it's referenced. */
2194                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2195
2196                         ldlm_lock_decref(&lockh, mode);
2197                         LDLM_LOCK_PUT(matched);
2198                         RETURN(ELDLM_OK);
2199                 } else {
2200                         ldlm_lock_decref(&lockh, mode);
2201                         LDLM_LOCK_PUT(matched);
2202                 }
2203         }
2204
2205 no_match:
2206         if (*flags & LDLM_FL_TEST_LOCK)
2207                 RETURN(-ENOLCK);
2208
2209         if (intent) {
2210                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2211                                            &RQF_LDLM_ENQUEUE_LVB);
2212                 if (req == NULL)
2213                         RETURN(-ENOMEM);
2214
2215                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2216                 if (rc < 0) {
2217                         ptlrpc_request_free(req);
2218                         RETURN(rc);
2219                 }
2220
2221                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2222                                      sizeof *lvb);
2223                 ptlrpc_request_set_replen(req);
2224         }
2225
2226         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2227         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2228
2229         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2230                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2231         if (async) {
2232                 if (!rc) {
2233                         struct osc_enqueue_args *aa;
2234                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2235                         aa = ptlrpc_req_async_args(req);
2236                         aa->oa_exp    = exp;
2237                         aa->oa_mode   = einfo->ei_mode;
2238                         aa->oa_type   = einfo->ei_type;
2239                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2240                         aa->oa_upcall = upcall;
2241                         aa->oa_cookie = cookie;
2242                         aa->oa_agl    = !!agl;
2243                         if (!agl) {
2244                                 aa->oa_flags  = flags;
2245                                 aa->oa_lvb    = lvb;
2246                         } else {
2247                                 /* AGL is essentially to enqueue an DLM lock
2248                                  * in advance, so we don't care about the
2249                                  * result of AGL enqueue. */
2250                                 aa->oa_lvb    = NULL;
2251                                 aa->oa_flags  = NULL;
2252                         }
2253
2254                         req->rq_interpret_reply =
2255                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2256                         if (rqset == PTLRPCD_SET)
2257                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2258                         else
2259                                 ptlrpc_set_add_req(rqset, req);
2260                 } else if (intent) {
2261                         ptlrpc_req_finished(req);
2262                 }
2263                 RETURN(rc);
2264         }
2265
2266         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2267                               flags, agl, rc);
2268         if (intent)
2269                 ptlrpc_req_finished(req);
2270
2271         RETURN(rc);
2272 }
2273
2274 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2275                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2276                    __u64 *flags, void *data, struct lustre_handle *lockh,
2277                    int unref)
2278 {
2279         struct obd_device *obd = exp->exp_obd;
2280         __u64 lflags = *flags;
2281         ldlm_mode_t rc;
2282         ENTRY;
2283
2284         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2285                 RETURN(-EIO);
2286
2287         /* Filesystem lock extents are extended to page boundaries so that
2288          * dealing with the page cache is a little smoother */
2289         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2290         policy->l_extent.end |= ~CFS_PAGE_MASK;
2291
2292         /* Next, search for already existing extent locks that will cover us */
2293         /* If we're trying to read, we also search for an existing PW lock.  The
2294          * VFS and page cache already protect us locally, so lots of readers/
2295          * writers can share a single PW lock. */
2296         rc = mode;
2297         if (mode == LCK_PR)
2298                 rc |= LCK_PW;
2299         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2300                              res_id, type, policy, rc, lockh, unref);
2301         if (rc) {
2302                 if (data != NULL) {
2303                         if (!osc_set_data_with_check(lockh, data)) {
2304                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2305                                         ldlm_lock_decref(lockh, rc);
2306                                 RETURN(0);
2307                         }
2308                 }
2309                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2310                         ldlm_lock_addref(lockh, LCK_PR);
2311                         ldlm_lock_decref(lockh, LCK_PW);
2312                 }
2313                 RETURN(rc);
2314         }
2315         RETURN(rc);
2316 }
2317
2318 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2319 {
2320         ENTRY;
2321
2322         if (unlikely(mode == LCK_GROUP))
2323                 ldlm_lock_decref_and_cancel(lockh, mode);
2324         else
2325                 ldlm_lock_decref(lockh, mode);
2326
2327         RETURN(0);
2328 }
2329
2330 static int osc_statfs_interpret(const struct lu_env *env,
2331                                 struct ptlrpc_request *req,
2332                                 struct osc_async_args *aa, int rc)
2333 {
2334         struct obd_statfs *msfs;
2335         ENTRY;
2336
2337         if (rc == -EBADR)
2338                 /* The request has in fact never been sent
2339                  * due to issues at a higher level (LOV).
2340                  * Exit immediately since the caller is
2341                  * aware of the problem and takes care
2342                  * of the clean up */
2343                  RETURN(rc);
2344
2345         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2346             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2347                 GOTO(out, rc = 0);
2348
2349         if (rc != 0)
2350                 GOTO(out, rc);
2351
2352         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2353         if (msfs == NULL) {
2354                 GOTO(out, rc = -EPROTO);
2355         }
2356
2357         *aa->aa_oi->oi_osfs = *msfs;
2358 out:
2359         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2360         RETURN(rc);
2361 }
2362
2363 static int osc_statfs_async(struct obd_export *exp,
2364                             struct obd_info *oinfo, __u64 max_age,
2365                             struct ptlrpc_request_set *rqset)
2366 {
2367         struct obd_device     *obd = class_exp2obd(exp);
2368         struct ptlrpc_request *req;
2369         struct osc_async_args *aa;
2370         int                    rc;
2371         ENTRY;
2372
2373         /* We could possibly pass max_age in the request (as an absolute
2374          * timestamp or a "seconds.usec ago") so the target can avoid doing
2375          * extra calls into the filesystem if that isn't necessary (e.g.
2376          * during mount that would help a bit).  Having relative timestamps
2377          * is not so great if request processing is slow, while absolute
2378          * timestamps are not ideal because they need time synchronization. */
2379         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2380         if (req == NULL)
2381                 RETURN(-ENOMEM);
2382
2383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2384         if (rc) {
2385                 ptlrpc_request_free(req);
2386                 RETURN(rc);
2387         }
2388         ptlrpc_request_set_replen(req);
2389         req->rq_request_portal = OST_CREATE_PORTAL;
2390         ptlrpc_at_set_req_timeout(req);
2391
2392         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2393                 /* procfs requests not want stat in wait for avoid deadlock */
2394                 req->rq_no_resend = 1;
2395                 req->rq_no_delay = 1;
2396         }
2397
2398         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2399         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2400         aa = ptlrpc_req_async_args(req);
2401         aa->aa_oi = oinfo;
2402
2403         ptlrpc_set_add_req(rqset, req);
2404         RETURN(0);
2405 }
2406
2407 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2408                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2409 {
2410         struct obd_device     *obd = class_exp2obd(exp);
2411         struct obd_statfs     *msfs;
2412         struct ptlrpc_request *req;
2413         struct obd_import     *imp = NULL;
2414         int rc;
2415         ENTRY;
2416
2417         /*Since the request might also come from lprocfs, so we need
2418          *sync this with client_disconnect_export Bug15684*/
2419         down_read(&obd->u.cli.cl_sem);
2420         if (obd->u.cli.cl_import)
2421                 imp = class_import_get(obd->u.cli.cl_import);
2422         up_read(&obd->u.cli.cl_sem);
2423         if (!imp)
2424                 RETURN(-ENODEV);
2425
2426         /* We could possibly pass max_age in the request (as an absolute
2427          * timestamp or a "seconds.usec ago") so the target can avoid doing
2428          * extra calls into the filesystem if that isn't necessary (e.g.
2429          * during mount that would help a bit).  Having relative timestamps
2430          * is not so great if request processing is slow, while absolute
2431          * timestamps are not ideal because they need time synchronization. */
2432         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2433
2434         class_import_put(imp);
2435
2436         if (req == NULL)
2437                 RETURN(-ENOMEM);
2438
2439         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2440         if (rc) {
2441                 ptlrpc_request_free(req);
2442                 RETURN(rc);
2443         }
2444         ptlrpc_request_set_replen(req);
2445         req->rq_request_portal = OST_CREATE_PORTAL;
2446         ptlrpc_at_set_req_timeout(req);
2447
2448         if (flags & OBD_STATFS_NODELAY) {
2449                 /* procfs requests not want stat in wait for avoid deadlock */
2450                 req->rq_no_resend = 1;
2451                 req->rq_no_delay = 1;
2452         }
2453
2454         rc = ptlrpc_queue_wait(req);
2455         if (rc)
2456                 GOTO(out, rc);
2457
2458         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2459         if (msfs == NULL) {
2460                 GOTO(out, rc = -EPROTO);
2461         }
2462
2463         *osfs = *msfs;
2464
2465         EXIT;
2466  out:
2467         ptlrpc_req_finished(req);
2468         return rc;
2469 }
2470
2471 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2472                          void *karg, void *uarg)
2473 {
2474         struct obd_device *obd = exp->exp_obd;
2475         struct obd_ioctl_data *data = karg;
2476         int err = 0;
2477         ENTRY;
2478
2479         if (!try_module_get(THIS_MODULE)) {
2480                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2481                        module_name(THIS_MODULE));
2482                 return -EINVAL;
2483         }
2484         switch (cmd) {
2485         case OBD_IOC_CLIENT_RECOVER:
2486                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2487                                             data->ioc_inlbuf1, 0);
2488                 if (err > 0)
2489                         err = 0;
2490                 GOTO(out, err);
2491         case IOC_OSC_SET_ACTIVE:
2492                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2493                                                data->ioc_offset);
2494                 GOTO(out, err);
2495         case OBD_IOC_POLL_QUOTACHECK:
2496                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2497                 GOTO(out, err);
2498         case OBD_IOC_PING_TARGET:
2499                 err = ptlrpc_obd_ping(obd);
2500                 GOTO(out, err);
2501         default:
2502                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2503                        cmd, current_comm());
2504                 GOTO(out, err = -ENOTTY);
2505         }
2506 out:
2507         module_put(THIS_MODULE);
2508         return err;
2509 }
2510
2511 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2512                         obd_count keylen, void *key, __u32 *vallen, void *val,
2513                         struct lov_stripe_md *lsm)
2514 {
2515         ENTRY;
2516         if (!vallen || !val)
2517                 RETURN(-EFAULT);
2518
2519         if (KEY_IS(KEY_FIEMAP)) {
2520                 struct ll_fiemap_info_key *fm_key =
2521                                 (struct ll_fiemap_info_key *)key;
2522                 struct ldlm_res_id       res_id;
2523                 ldlm_policy_data_t       policy;
2524                 struct lustre_handle     lockh;
2525                 ldlm_mode_t              mode = 0;
2526                 struct ptlrpc_request   *req;
2527                 struct ll_user_fiemap   *reply;
2528                 char                    *tmp;
2529                 int                      rc;
2530
2531                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2532                         goto skip_locking;
2533
2534                 policy.l_extent.start = fm_key->fiemap.fm_start &
2535                                                 CFS_PAGE_MASK;
2536
2537                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2538                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2539                         policy.l_extent.end = OBD_OBJECT_EOF;
2540                 else
2541                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2542                                 fm_key->fiemap.fm_length +
2543                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2544
2545                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2546                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2547                                        LDLM_FL_BLOCK_GRANTED |
2548                                        LDLM_FL_LVB_READY,
2549                                        &res_id, LDLM_EXTENT, &policy,
2550                                        LCK_PR | LCK_PW, &lockh, 0);
2551                 if (mode) { /* lock is cached on client */
2552                         if (mode != LCK_PR) {
2553                                 ldlm_lock_addref(&lockh, LCK_PR);
2554                                 ldlm_lock_decref(&lockh, LCK_PW);
2555                         }
2556                 } else { /* no cached lock, needs acquire lock on server side */
2557                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2558                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2559                 }
2560
2561 skip_locking:
2562                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2563                                            &RQF_OST_GET_INFO_FIEMAP);
2564                 if (req == NULL)
2565                         GOTO(drop_lock, rc = -ENOMEM);
2566
2567                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2568                                      RCL_CLIENT, keylen);
2569                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2570                                      RCL_CLIENT, *vallen);
2571                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2572                                      RCL_SERVER, *vallen);
2573
2574                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2575                 if (rc) {
2576                         ptlrpc_request_free(req);
2577                         GOTO(drop_lock, rc);
2578                 }
2579
2580                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2581                 memcpy(tmp, key, keylen);
2582                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2583                 memcpy(tmp, val, *vallen);
2584
2585                 ptlrpc_request_set_replen(req);
2586                 rc = ptlrpc_queue_wait(req);
2587                 if (rc)
2588                         GOTO(fini_req, rc);
2589
2590                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2591                 if (reply == NULL)
2592                         GOTO(fini_req, rc = -EPROTO);
2593
2594                 memcpy(val, reply, *vallen);
2595 fini_req:
2596                 ptlrpc_req_finished(req);
2597 drop_lock:
2598                 if (mode)
2599                         ldlm_lock_decref(&lockh, LCK_PR);
2600                 RETURN(rc);
2601         }
2602
2603         RETURN(-EINVAL);
2604 }
2605
2606 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2607                               obd_count keylen, void *key, obd_count vallen,
2608                               void *val, struct ptlrpc_request_set *set)
2609 {
2610         struct ptlrpc_request *req;
2611         struct obd_device     *obd = exp->exp_obd;
2612         struct obd_import     *imp = class_exp2cliimp(exp);
2613         char                  *tmp;
2614         int                    rc;
2615         ENTRY;
2616
2617         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2618
2619         if (KEY_IS(KEY_CHECKSUM)) {
2620                 if (vallen != sizeof(int))
2621                         RETURN(-EINVAL);
2622                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2623                 RETURN(0);
2624         }
2625
2626         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2627                 sptlrpc_conf_client_adapt(obd);
2628                 RETURN(0);
2629         }
2630
2631         if (KEY_IS(KEY_FLUSH_CTX)) {
2632                 sptlrpc_import_flush_my_ctx(imp);
2633                 RETURN(0);
2634         }
2635
2636         if (KEY_IS(KEY_CACHE_SET)) {
2637                 struct client_obd *cli = &obd->u.cli;
2638
2639                 LASSERT(cli->cl_cache == NULL); /* only once */
2640                 cli->cl_cache = (struct cl_client_cache *)val;
2641                 atomic_inc(&cli->cl_cache->ccc_users);
2642                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2643
2644                 /* add this osc into entity list */
2645                 LASSERT(list_empty(&cli->cl_lru_osc));
2646                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2647                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2648                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2649
2650                 RETURN(0);
2651         }
2652
2653         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2654                 struct client_obd *cli = &obd->u.cli;
2655                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2656                 long target = *(long *)val;
2657
2658                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2659                 *(long *)val -= nr;
2660                 RETURN(0);
2661         }
2662
2663         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2664                 RETURN(-EINVAL);
2665
2666         /* We pass all other commands directly to OST. Since nobody calls osc
2667            methods directly and everybody is supposed to go through LOV, we
2668            assume lov checked invalid values for us.
2669            The only recognised values so far are evict_by_nid and mds_conn.
2670            Even if something bad goes through, we'd get a -EINVAL from OST
2671            anyway. */
2672
2673         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2674                                                 &RQF_OST_SET_GRANT_INFO :
2675                                                 &RQF_OBD_SET_INFO);
2676         if (req == NULL)
2677                 RETURN(-ENOMEM);
2678
2679         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2680                              RCL_CLIENT, keylen);
2681         if (!KEY_IS(KEY_GRANT_SHRINK))
2682                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2683                                      RCL_CLIENT, vallen);
2684         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2685         if (rc) {
2686                 ptlrpc_request_free(req);
2687                 RETURN(rc);
2688         }
2689
2690         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2691         memcpy(tmp, key, keylen);
2692         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2693                                                         &RMF_OST_BODY :
2694                                                         &RMF_SETINFO_VAL);
2695         memcpy(tmp, val, vallen);
2696
2697         if (KEY_IS(KEY_GRANT_SHRINK)) {
2698                 struct osc_grant_args *aa;
2699                 struct obdo *oa;
2700
2701                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2702                 aa = ptlrpc_req_async_args(req);
2703                 OBDO_ALLOC(oa);
2704                 if (!oa) {
2705                         ptlrpc_req_finished(req);
2706                         RETURN(-ENOMEM);
2707                 }
2708                 *oa = ((struct ost_body *)val)->oa;
2709                 aa->aa_oa = oa;
2710                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2711         }
2712
2713         ptlrpc_request_set_replen(req);
2714         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2715                 LASSERT(set != NULL);
2716                 ptlrpc_set_add_req(set, req);
2717                 ptlrpc_check_set(NULL, set);
2718         } else
2719                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2720
2721         RETURN(0);
2722 }
2723
2724 static int osc_reconnect(const struct lu_env *env,
2725                          struct obd_export *exp, struct obd_device *obd,
2726                          struct obd_uuid *cluuid,
2727                          struct obd_connect_data *data,
2728                          void *localdata)
2729 {
2730         struct client_obd *cli = &obd->u.cli;
2731
2732         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2733                 long lost_grant;
2734
2735                 spin_lock(&cli->cl_loi_list_lock);
2736                 data->ocd_grant = (cli->cl_avail_grant +
2737                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2738                                   2 * cli_brw_size(obd);
2739                 lost_grant = cli->cl_lost_grant;
2740                 cli->cl_lost_grant = 0;
2741                 spin_unlock(&cli->cl_loi_list_lock);
2742
2743                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2744                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2745                        data->ocd_version, data->ocd_grant, lost_grant);
2746         }
2747
2748         RETURN(0);
2749 }
2750
2751 static int osc_disconnect(struct obd_export *exp)
2752 {
2753         struct obd_device *obd = class_exp2obd(exp);
2754         int rc;
2755
2756         rc = client_disconnect_export(exp);
2757         /**
2758          * Initially we put del_shrink_grant before disconnect_export, but it
2759          * causes the following problem if setup (connect) and cleanup
2760          * (disconnect) are tangled together.
2761          *      connect p1                     disconnect p2
2762          *   ptlrpc_connect_import
2763          *     ...............               class_manual_cleanup
2764          *                                     osc_disconnect
2765          *                                     del_shrink_grant
2766          *   ptlrpc_connect_interrupt
2767          *     init_grant_shrink
2768          *   add this client to shrink list
2769          *                                      cleanup_osc
2770          * Bang! pinger trigger the shrink.
2771          * So the osc should be disconnected from the shrink list, after we
2772          * are sure the import has been destroyed. BUG18662
2773          */
2774         if (obd->u.cli.cl_import == NULL)
2775                 osc_del_shrink_grant(&obd->u.cli);
2776         return rc;
2777 }
2778
2779 static int osc_import_event(struct obd_device *obd,
2780                             struct obd_import *imp,
2781                             enum obd_import_event event)
2782 {
2783         struct client_obd *cli;
2784         int rc = 0;
2785
2786         ENTRY;
2787         LASSERT(imp->imp_obd == obd);
2788
2789         switch (event) {
2790         case IMP_EVENT_DISCON: {
2791                 cli = &obd->u.cli;
2792                 spin_lock(&cli->cl_loi_list_lock);
2793                 cli->cl_avail_grant = 0;
2794                 cli->cl_lost_grant = 0;
2795                 spin_unlock(&cli->cl_loi_list_lock);
2796                 break;
2797         }
2798         case IMP_EVENT_INACTIVE: {
2799                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2800                 break;
2801         }
2802         case IMP_EVENT_INVALIDATE: {
2803                 struct ldlm_namespace *ns = obd->obd_namespace;
2804                 struct lu_env         *env;
2805                 int                    refcheck;
2806
2807                 env = cl_env_get(&refcheck);
2808                 if (!IS_ERR(env)) {
2809                         /* Reset grants */
2810                         cli = &obd->u.cli;
2811                         /* all pages go to failing rpcs due to the invalid
2812                          * import */
2813                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2814
2815                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2816                         cl_env_put(env, &refcheck);
2817                 } else
2818                         rc = PTR_ERR(env);
2819                 break;
2820         }
2821         case IMP_EVENT_ACTIVE: {
2822                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2823                 break;
2824         }
2825         case IMP_EVENT_OCD: {
2826                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2827
2828                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2829                         osc_init_grant(&obd->u.cli, ocd);
2830
2831                 /* See bug 7198 */
2832                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2833                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2834
2835                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2836                 break;
2837         }
2838         case IMP_EVENT_DEACTIVATE: {
2839                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2840                 break;
2841         }
2842         case IMP_EVENT_ACTIVATE: {
2843                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2844                 break;
2845         }
2846         default:
2847                 CERROR("Unknown import event %d\n", event);
2848                 LBUG();
2849         }
2850         RETURN(rc);
2851 }
2852
2853 /**
2854  * Determine whether the lock can be canceled before replaying the lock
2855  * during recovery, see bug16774 for detailed information.
2856  *
2857  * \retval zero the lock can't be canceled
2858  * \retval other ok to cancel
2859  */
2860 static int osc_cancel_weight(struct ldlm_lock *lock)
2861 {
2862         /*
2863          * Cancel all unused and granted extent lock.
2864          */
2865         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2866             lock->l_granted_mode == lock->l_req_mode &&
2867             osc_ldlm_weigh_ast(lock) == 0)
2868                 RETURN(1);
2869
2870         RETURN(0);
2871 }
2872
2873 static int brw_queue_work(const struct lu_env *env, void *data)
2874 {
2875         struct client_obd *cli = data;
2876
2877         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2878
2879         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2880         RETURN(0);
2881 }
2882
2883 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2884 {
2885         struct client_obd *cli = &obd->u.cli;
2886         struct obd_type   *type;
2887         void              *handler;
2888         int                rc;
2889         ENTRY;
2890
2891         rc = ptlrpcd_addref();
2892         if (rc)
2893                 RETURN(rc);
2894
2895         rc = client_obd_setup(obd, lcfg);
2896         if (rc)
2897                 GOTO(out_ptlrpcd, rc);
2898
2899         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2900         if (IS_ERR(handler))
2901                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2902         cli->cl_writeback_work = handler;
2903
2904         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2905         if (IS_ERR(handler))
2906                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2907         cli->cl_lru_work = handler;
2908
2909         rc = osc_quota_setup(obd);
2910         if (rc)
2911                 GOTO(out_ptlrpcd_work, rc);
2912
2913         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2914
2915 #ifdef CONFIG_PROC_FS
2916         obd->obd_vars = lprocfs_osc_obd_vars;
2917 #endif
2918         /* If this is true then both client (osc) and server (osp) are on the
2919          * same node. The osp layer if loaded first will register the osc proc
2920          * directory. In that case this obd_device will be attached its proc
2921          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2922         type = class_search_type(LUSTRE_OSP_NAME);
2923         if (type && type->typ_procsym) {
2924                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2925                                                        type->typ_procsym,
2926                                                        obd->obd_vars, obd);
2927                 if (IS_ERR(obd->obd_proc_entry)) {
2928                         rc = PTR_ERR(obd->obd_proc_entry);
2929                         CERROR("error %d setting up lprocfs for %s\n", rc,
2930                                obd->obd_name);
2931                         obd->obd_proc_entry = NULL;
2932                 }
2933         } else {
2934                 rc = lprocfs_obd_setup(obd);
2935         }
2936
2937         /* If the basic OSC proc tree construction succeeded then
2938          * lets do the rest. */
2939         if (rc == 0) {
2940                 lproc_osc_attach_seqstat(obd);
2941                 sptlrpc_lprocfs_cliobd_attach(obd);
2942                 ptlrpc_lprocfs_register_obd(obd);
2943         }
2944
2945         /* We need to allocate a few requests more, because
2946          * brw_interpret tries to create new requests before freeing
2947          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2948          * reserved, but I'm afraid that might be too much wasted RAM
2949          * in fact, so 2 is just my guess and still should work. */
2950         cli->cl_import->imp_rq_pool =
2951                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2952                                     OST_MAXREQSIZE,
2953                                     ptlrpc_add_rqs_to_pool);
2954
2955         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2956         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2957         RETURN(0);
2958
2959 out_ptlrpcd_work:
2960         if (cli->cl_writeback_work != NULL) {
2961                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2962                 cli->cl_writeback_work = NULL;
2963         }
2964         if (cli->cl_lru_work != NULL) {
2965                 ptlrpcd_destroy_work(cli->cl_lru_work);
2966                 cli->cl_lru_work = NULL;
2967         }
2968 out_client_setup:
2969         client_obd_cleanup(obd);
2970 out_ptlrpcd:
2971         ptlrpcd_decref();
2972         RETURN(rc);
2973 }
2974
2975 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2976 {
2977         int rc = 0;
2978         ENTRY;
2979
2980         switch (stage) {
2981         case OBD_CLEANUP_EARLY: {
2982                 struct obd_import *imp;
2983                 imp = obd->u.cli.cl_import;
2984                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2985                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2986                 ptlrpc_deactivate_import(imp);
2987                 spin_lock(&imp->imp_lock);
2988                 imp->imp_pingable = 0;
2989                 spin_unlock(&imp->imp_lock);
2990                 break;
2991         }
2992         case OBD_CLEANUP_EXPORTS: {
2993                 struct client_obd *cli = &obd->u.cli;
2994                 /* LU-464
2995                  * for echo client, export may be on zombie list, wait for
2996                  * zombie thread to cull it, because cli.cl_import will be
2997                  * cleared in client_disconnect_export():
2998                  *   class_export_destroy() -> obd_cleanup() ->
2999                  *   echo_device_free() -> echo_client_cleanup() ->
3000                  *   obd_disconnect() -> osc_disconnect() ->
3001                  *   client_disconnect_export()
3002                  */
3003                 obd_zombie_barrier();
3004                 if (cli->cl_writeback_work) {
3005                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3006                         cli->cl_writeback_work = NULL;
3007                 }
3008                 if (cli->cl_lru_work) {
3009                         ptlrpcd_destroy_work(cli->cl_lru_work);
3010                         cli->cl_lru_work = NULL;
3011                 }
3012                 obd_cleanup_client_import(obd);
3013                 ptlrpc_lprocfs_unregister_obd(obd);
3014                 lprocfs_obd_cleanup(obd);
3015                 break;
3016                 }
3017         }
3018         RETURN(rc);
3019 }
3020
3021 int osc_cleanup(struct obd_device *obd)
3022 {
3023         struct client_obd *cli = &obd->u.cli;
3024         int rc;
3025
3026         ENTRY;
3027
3028         /* lru cleanup */
3029         if (cli->cl_cache != NULL) {
3030                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3031                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3032                 list_del_init(&cli->cl_lru_osc);
3033                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3034                 cli->cl_lru_left = NULL;
3035                 atomic_dec(&cli->cl_cache->ccc_users);
3036                 cli->cl_cache = NULL;
3037         }
3038
3039         /* free memory of osc quota cache */
3040         osc_quota_cleanup(obd);
3041
3042         rc = client_obd_cleanup(obd);
3043
3044         ptlrpcd_decref();
3045         RETURN(rc);
3046 }
3047
3048 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3049 {
3050         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3051         return rc > 0 ? 0: rc;
3052 }
3053
3054 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3055 {
3056         return osc_process_config_base(obd, buf);
3057 }
3058
3059 static struct obd_ops osc_obd_ops = {
3060         .o_owner                = THIS_MODULE,
3061         .o_setup                = osc_setup,
3062         .o_precleanup           = osc_precleanup,
3063         .o_cleanup              = osc_cleanup,
3064         .o_add_conn             = client_import_add_conn,
3065         .o_del_conn             = client_import_del_conn,
3066         .o_connect              = client_connect_import,
3067         .o_reconnect            = osc_reconnect,
3068         .o_disconnect           = osc_disconnect,
3069         .o_statfs               = osc_statfs,
3070         .o_statfs_async         = osc_statfs_async,
3071         .o_create               = osc_create,
3072         .o_destroy              = osc_destroy,
3073         .o_getattr              = osc_getattr,
3074         .o_getattr_async        = osc_getattr_async,
3075         .o_setattr              = osc_setattr,
3076         .o_setattr_async        = osc_setattr_async,
3077         .o_iocontrol            = osc_iocontrol,
3078         .o_get_info             = osc_get_info,
3079         .o_set_info_async       = osc_set_info_async,
3080         .o_import_event         = osc_import_event,
3081         .o_process_config       = osc_process_config,
3082         .o_quotactl             = osc_quotactl,
3083         .o_quotacheck           = osc_quotacheck,
3084 };
3085
3086 static int __init osc_init(void)
3087 {
3088         bool enable_proc = true;
3089         struct obd_type *type;
3090         int rc;
3091         ENTRY;
3092
3093         /* print an address of _any_ initialized kernel symbol from this
3094          * module, to allow debugging with gdb that doesn't support data
3095          * symbols from modules.*/
3096         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3097
3098         rc = lu_kmem_init(osc_caches);
3099         if (rc)
3100                 RETURN(rc);
3101
3102         type = class_search_type(LUSTRE_OSP_NAME);
3103         if (type != NULL && type->typ_procsym != NULL)
3104                 enable_proc = false;
3105
3106         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3107                                  LUSTRE_OSC_NAME, &osc_device_type);
3108         if (rc) {
3109                 lu_kmem_fini(osc_caches);
3110                 RETURN(rc);
3111         }
3112
3113         RETURN(rc);
3114 }
3115
3116 static void /*__exit*/ osc_exit(void)
3117 {
3118         class_unregister_type(LUSTRE_OSC_NAME);
3119         lu_kmem_fini(osc_caches);
3120 }
3121
3122 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3123 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3124 MODULE_LICENSE("GPL");
3125
3126 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);