Whamcloud - gitweb
LU-5568 lnet: fix kernel crash when network failed to start
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 static inline void osc_pack_capa(struct ptlrpc_request *req,
104                                  struct ost_body *body, void *capa)
105 {
106         struct obd_capa *oc = (struct obd_capa *)capa;
107         struct lustre_capa *c;
108
109         if (!capa)
110                 return;
111
112         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
113         LASSERT(c);
114         capa_cpy(c, oc);
115         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
116         DEBUG_CAPA(D_SEC, c, "pack");
117 }
118
119 static inline void osc_pack_req_body(struct ptlrpc_request *req,
120                                      struct obd_info *oinfo)
121 {
122         struct ost_body *body;
123
124         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
125         LASSERT(body);
126
127         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
128                              oinfo->oi_oa);
129         osc_pack_capa(req, body, oinfo->oi_capa);
130 }
131
132 static inline void osc_set_capa_size(struct ptlrpc_request *req,
133                                      const struct req_msg_field *field,
134                                      struct obd_capa *oc)
135 {
136         if (oc == NULL)
137                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
138         else
139                 /* it is already calculated as sizeof struct obd_capa */
140                 ;
141 }
142
143 static int osc_getattr_interpret(const struct lu_env *env,
144                                  struct ptlrpc_request *req,
145                                  struct osc_async_args *aa, int rc)
146 {
147         struct ost_body *body;
148         ENTRY;
149
150         if (rc != 0)
151                 GOTO(out, rc);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154         if (body) {
155                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
157                                      aa->aa_oi->oi_oa, &body->oa);
158
159                 /* This should really be sent by the OST */
160                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
161                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
162         } else {
163                 CDEBUG(D_INFO, "can't unpack ost_body\n");
164                 rc = -EPROTO;
165                 aa->aa_oi->oi_oa->o_valid = 0;
166         }
167 out:
168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
169         RETURN(rc);
170 }
171
172 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
173                              struct ptlrpc_request_set *set)
174 {
175         struct ptlrpc_request *req;
176         struct osc_async_args *aa;
177         int                    rc;
178         ENTRY;
179
180         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
181         if (req == NULL)
182                 RETURN(-ENOMEM);
183
184         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
185         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
186         if (rc) {
187                 ptlrpc_request_free(req);
188                 RETURN(rc);
189         }
190
191         osc_pack_req_body(req, oinfo);
192
193         ptlrpc_request_set_replen(req);
194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
195
196         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
197         aa = ptlrpc_req_async_args(req);
198         aa->aa_oi = oinfo;
199
200         ptlrpc_set_add_req(set, req);
201         RETURN(0);
202 }
203
204 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
205                        struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body       *body;
209         int                    rc;
210         ENTRY;
211
212         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
213         if (req == NULL)
214                 RETURN(-ENOMEM);
215
216         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oinfo);
224
225         ptlrpc_request_set_replen(req);
226
227         rc = ptlrpc_queue_wait(req);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
232         if (body == NULL)
233                 GOTO(out, rc = -EPROTO);
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
237                              &body->oa);
238
239         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
249                        struct obd_info *oinfo, struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body       *body;
253         int                    rc;
254         ENTRY;
255
256         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
257
258         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
259         if (req == NULL)
260                 RETURN(-ENOMEM);
261
262         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
264         if (rc) {
265                 ptlrpc_request_free(req);
266                 RETURN(rc);
267         }
268
269         osc_pack_req_body(req, oinfo);
270
271         ptlrpc_request_set_replen(req);
272
273         rc = ptlrpc_queue_wait(req);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
282                              &body->oa);
283
284         EXIT;
285 out:
286         ptlrpc_req_finished(req);
287         RETURN(rc);
288 }
289
290 static int osc_setattr_interpret(const struct lu_env *env,
291                                  struct ptlrpc_request *req,
292                                  struct osc_setattr_args *sa, int rc)
293 {
294         struct ost_body *body;
295         ENTRY;
296
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
305                              &body->oa);
306 out:
307         rc = sa->sa_upcall(sa->sa_cookie, rc);
308         RETURN(rc);
309 }
310
311 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
312                            struct obd_trans_info *oti,
313                            obd_enqueue_update_f upcall, void *cookie,
314                            struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
333                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         /* do mds to ost setattr asynchronously */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
343         } else {
344                 req->rq_interpret_reply =
345                         (ptlrpc_interpterer_t)osc_setattr_interpret;
346
347                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
348                 sa = ptlrpc_req_async_args(req);
349                 sa->sa_oa = oinfo->oi_oa;
350                 sa->sa_upcall = upcall;
351                 sa->sa_cookie = cookie;
352
353                 if (rqset == PTLRPCD_SET)
354                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
355                 else
356                         ptlrpc_set_add_req(rqset, req);
357         }
358
359         RETURN(0);
360 }
361
362 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
363                              struct obd_trans_info *oti,
364                              struct ptlrpc_request_set *rqset)
365 {
366         return osc_setattr_async_base(exp, oinfo, oti,
367                                       oinfo->oi_cb_up, oinfo, rqset);
368 }
369
370 static int osc_create(const struct lu_env *env, struct obd_export *exp,
371                       struct obdo *oa, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body       *body;
375         int                    rc;
376         ENTRY;
377
378         LASSERT(oa != NULL);
379         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
380         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
383         if (req == NULL)
384                 GOTO(out, rc = -ENOMEM);
385
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 GOTO(out, rc);
390         }
391
392         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
393         LASSERT(body);
394
395         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
396
397         ptlrpc_request_set_replen(req);
398
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
412         if (body == NULL)
413                 GOTO(out_req, rc = -EPROTO);
414
415         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
416         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
417
418         oa->o_blksize = cli_brw_size(exp->exp_obd);
419         oa->o_valid |= OBD_MD_FLBLKSZ;
420
421         if (oti != NULL) {
422                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
423                         if (oti->oti_logcookies == NULL)
424                                 oti->oti_logcookies = &oti->oti_onecookie;
425
426                         *oti->oti_logcookies = oa->o_lcookie;
427                 }
428         }
429
430         CDEBUG(D_HA, "transno: "LPD64"\n",
431                lustre_msg_get_transno(req->rq_repmsg));
432 out_req:
433         ptlrpc_req_finished(req);
434 out:
435         RETURN(rc);
436 }
437
438 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
439                    obd_enqueue_update_f upcall, void *cookie,
440                    struct ptlrpc_request_set *rqset)
441 {
442         struct ptlrpc_request   *req;
443         struct osc_setattr_args *sa;
444         struct ost_body         *body;
445         int                      rc;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
449         if (req == NULL)
450                 RETURN(-ENOMEM);
451
452         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(rc);
457         }
458         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
459         ptlrpc_at_set_req_timeout(req);
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
464                              oinfo->oi_oa);
465         osc_pack_capa(req, body, oinfo->oi_capa);
466
467         ptlrpc_request_set_replen(req);
468
469         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
470         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
471         sa = ptlrpc_req_async_args(req);
472         sa->sa_oa     = oinfo->oi_oa;
473         sa->sa_upcall = upcall;
474         sa->sa_cookie = cookie;
475         if (rqset == PTLRPCD_SET)
476                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
477         else
478                 ptlrpc_set_add_req(rqset, req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req,
485                               void *arg, int rc)
486 {
487         struct osc_fsync_args *fa = arg;
488         struct ost_body *body;
489         ENTRY;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
495         if (body == NULL) {
496                 CERROR ("can't unpack ost_body\n");
497                 GOTO(out, rc = -EPROTO);
498         }
499
500         *fa->fa_oi->oi_oa = body->oa;
501 out:
502         rc = fa->fa_upcall(fa->fa_cookie, rc);
503         RETURN(rc);
504 }
505
506 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
507                   obd_enqueue_update_f upcall, void *cookie,
508                   struct ptlrpc_request_set *rqset)
509 {
510         struct ptlrpc_request *req;
511         struct ost_body       *body;
512         struct osc_fsync_args *fa;
513         int                    rc;
514         ENTRY;
515
516         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
517         if (req == NULL)
518                 RETURN(-ENOMEM);
519
520         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(rc);
525         }
526
527         /* overload the size and blocks fields in the oa with start/end */
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
531                              oinfo->oi_oa);
532         osc_pack_capa(req, body, oinfo->oi_capa);
533
534         ptlrpc_request_set_replen(req);
535         req->rq_interpret_reply = osc_sync_interpret;
536
537         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
538         fa = ptlrpc_req_async_args(req);
539         fa->fa_oi = oinfo;
540         fa->fa_upcall = upcall;
541         fa->fa_cookie = cookie;
542
543         if (rqset == PTLRPCD_SET)
544                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
545         else
546                 ptlrpc_set_add_req(rqset, req);
547
548         RETURN (0);
549 }
550
551 /* Find and cancel locally locks matched by @mode in the resource found by
552  * @objid. Found locks are added into @cancel list. Returns the amount of
553  * locks added to @cancels list. */
554 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
555                                    struct list_head *cancels,
556                                    ldlm_mode_t mode, __u64 lock_flags)
557 {
558         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
559         struct ldlm_res_id res_id;
560         struct ldlm_resource *res;
561         int count;
562         ENTRY;
563
564         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
565          * export) but disabled through procfs (flag in NS).
566          *
567          * This distinguishes from a case when ELC is not supported originally,
568          * when we still want to cancel locks in advance and just cancel them
569          * locally, without sending any RPC. */
570         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
571                 RETURN(0);
572
573         ostid_build_res_name(&oa->o_oi, &res_id);
574         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
575         if (IS_ERR(res))
576                 RETURN(0);
577
578         LDLM_RESOURCE_ADDREF(res);
579         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
580                                            lock_flags, 0, NULL);
581         LDLM_RESOURCE_DELREF(res);
582         ldlm_resource_putref(res);
583         RETURN(count);
584 }
585
586 static int osc_destroy_interpret(const struct lu_env *env,
587                                  struct ptlrpc_request *req, void *data,
588                                  int rc)
589 {
590         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
591
592         atomic_dec(&cli->cl_destroy_in_flight);
593         wake_up(&cli->cl_destroy_waitq);
594         return 0;
595 }
596
597 static int osc_can_send_destroy(struct client_obd *cli)
598 {
599         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
600             cli->cl_max_rpcs_in_flight) {
601                 /* The destroy request can be sent */
602                 return 1;
603         }
604         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
605             cli->cl_max_rpcs_in_flight) {
606                 /*
607                  * The counter has been modified between the two atomic
608                  * operations.
609                  */
610                 wake_up(&cli->cl_destroy_waitq);
611         }
612         return 0;
613 }
614
615 /* Destroy requests can be async always on the client, and we don't even really
616  * care about the return code since the client cannot do anything at all about
617  * a destroy failure.
618  * When the MDS is unlinking a filename, it saves the file objects into a
619  * recovery llog, and these object records are cancelled when the OST reports
620  * they were destroyed and sync'd to disk (i.e. transaction committed).
621  * If the client dies, or the OST is down when the object should be destroyed,
622  * the records are not cancelled, and when the OST reconnects to the MDS next,
623  * it will retrieve the llog unlink logs and then sends the log cancellation
624  * cookies to the MDS after committing destroy transactions. */
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa, struct obd_trans_info *oti)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         struct list_head       cancels = LIST_HEAD_INIT(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         osc_set_capa_size(req, &RMF_CAPA1, NULL);
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
661                 oa->o_lcookie = *oti->oti_logcookies;
662         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
663         LASSERT(body);
664         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
665
666         ptlrpc_request_set_replen(req);
667
668         /* If osc_destory is for destroying the unlink orphan,
669          * sent from MDT to OST, which should not be blocked here,
670          * because the process might be triggered by ptlrpcd, and
671          * it is not good to block ptlrpcd thread (b=16006)*/
672         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
673                 req->rq_interpret_reply = osc_destroy_interpret;
674                 if (!osc_can_send_destroy(cli)) {
675                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
676                                                           NULL);
677
678                         /*
679                          * Wait until the number of on-going destroy RPCs drops
680                          * under max_rpc_in_flight
681                          */
682                         l_wait_event_exclusive(cli->cl_destroy_waitq,
683                                                osc_can_send_destroy(cli), &lwi);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         client_obd_list_lock(&cli->cl_loi_list_lock);
701         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
702         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
703                      cli->cl_dirty_max_pages)) {
704                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
705                        cli->cl_dirty_pages, cli->cl_dirty_transit,
706                        cli->cl_dirty_max_pages);
707                 oa->o_undirty = 0;
708         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
709                             atomic_long_read(&obd_dirty_transit_pages) >
710                             (obd_max_dirty_pages + 1))) {
711                 /* The atomic_read() allowing the atomic_inc() are
712                  * not covered by a lock thus they may safely race and trip
713                  * this CERROR() unless we add in a small fudge factor (+1). */
714                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
715                        cli->cl_import->imp_obd->obd_name,
716                        atomic_long_read(&obd_dirty_pages),
717                        atomic_long_read(&obd_dirty_transit_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
727                                       PAGE_CACHE_SHIFT) *
728                                      (cli->cl_max_rpcs_in_flight + 1);
729                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
730                                     max_in_flight);
731         }
732         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
733         oa->o_dropped = cli->cl_lost_grant;
734         cli->cl_lost_grant = 0;
735         client_obd_list_unlock(&cli->cl_loi_list_lock);
736         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
737                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
738
739 }
740
741 void osc_update_next_shrink(struct client_obd *cli)
742 {
743         cli->cl_next_shrink_grant =
744                 cfs_time_shift(cli->cl_grant_shrink_interval);
745         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
746                cli->cl_next_shrink_grant);
747 }
748
749 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
750 {
751         client_obd_list_lock(&cli->cl_loi_list_lock);
752         cli->cl_avail_grant += grant;
753         client_obd_list_unlock(&cli->cl_loi_list_lock);
754 }
755
756 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
757 {
758         if (body->oa.o_valid & OBD_MD_FLGRANT) {
759                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
760                 __osc_update_grant(cli, body->oa.o_grant);
761         }
762 }
763
764 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
765                               obd_count keylen, void *key, obd_count vallen,
766                               void *val, struct ptlrpc_request_set *set);
767
768 static int osc_shrink_grant_interpret(const struct lu_env *env,
769                                       struct ptlrpc_request *req,
770                                       void *aa, int rc)
771 {
772         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
773         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
774         struct ost_body *body;
775
776         if (rc != 0) {
777                 __osc_update_grant(cli, oa->o_grant);
778                 GOTO(out, rc);
779         }
780
781         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
782         LASSERT(body);
783         osc_update_grant(cli, body);
784 out:
785         OBDO_FREE(oa);
786         return rc;
787 }
788
789 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
790 {
791         client_obd_list_lock(&cli->cl_loi_list_lock);
792         oa->o_grant = cli->cl_avail_grant / 4;
793         cli->cl_avail_grant -= oa->o_grant;
794         client_obd_list_unlock(&cli->cl_loi_list_lock);
795         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
796                 oa->o_valid |= OBD_MD_FLFLAGS;
797                 oa->o_flags = 0;
798         }
799         oa->o_flags |= OBD_FL_SHRINK_GRANT;
800         osc_update_next_shrink(cli);
801 }
802
803 /* Shrink the current grant, either from some large amount to enough for a
804  * full set of in-flight RPCs, or if we have already shrunk to that limit
805  * then to enough for a single RPC.  This avoids keeping more grant than
806  * needed, and avoids shrinking the grant piecemeal. */
807 static int osc_shrink_grant(struct client_obd *cli)
808 {
809         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
810                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
811
812         client_obd_list_lock(&cli->cl_loi_list_lock);
813         if (cli->cl_avail_grant <= target_bytes)
814                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
815         client_obd_list_unlock(&cli->cl_loi_list_lock);
816
817         return osc_shrink_grant_to_target(cli, target_bytes);
818 }
819
820 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
821 {
822         int                     rc = 0;
823         struct ost_body        *body;
824         ENTRY;
825
826         client_obd_list_lock(&cli->cl_loi_list_lock);
827         /* Don't shrink if we are already above or below the desired limit
828          * We don't want to shrink below a single RPC, as that will negatively
829          * impact block allocation and long-term performance. */
830         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
831                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832
833         if (target_bytes >= cli->cl_avail_grant) {
834                 client_obd_list_unlock(&cli->cl_loi_list_lock);
835                 RETURN(0);
836         }
837         client_obd_list_unlock(&cli->cl_loi_list_lock);
838
839         OBD_ALLOC_PTR(body);
840         if (!body)
841                 RETURN(-ENOMEM);
842
843         osc_announce_cached(cli, &body->oa, 0);
844
845         client_obd_list_lock(&cli->cl_loi_list_lock);
846         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
847         cli->cl_avail_grant = target_bytes;
848         client_obd_list_unlock(&cli->cl_loi_list_lock);
849         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
850                 body->oa.o_valid |= OBD_MD_FLFLAGS;
851                 body->oa.o_flags = 0;
852         }
853         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
854         osc_update_next_shrink(cli);
855
856         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
857                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
858                                 sizeof(*body), body, NULL);
859         if (rc != 0)
860                 __osc_update_grant(cli, body->oa.o_grant);
861         OBD_FREE_PTR(body);
862         RETURN(rc);
863 }
864
865 static int osc_should_shrink_grant(struct client_obd *client)
866 {
867         cfs_time_t time = cfs_time_current();
868         cfs_time_t next_shrink = client->cl_next_shrink_grant;
869
870         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
871              OBD_CONNECT_GRANT_SHRINK) == 0)
872                 return 0;
873
874         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
875                 /* Get the current RPC size directly, instead of going via:
876                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
877                  * Keep comment here so that it can be found by searching. */
878                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
879
880                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
881                     client->cl_avail_grant > brw_size)
882                         return 1;
883                 else
884                         osc_update_next_shrink(client);
885         }
886         return 0;
887 }
888
889 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
890 {
891         struct client_obd *client;
892
893         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
894                 if (osc_should_shrink_grant(client))
895                         osc_shrink_grant(client);
896         }
897         return 0;
898 }
899
900 static int osc_add_shrink_grant(struct client_obd *client)
901 {
902         int rc;
903
904         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
905                                        TIMEOUT_GRANT,
906                                        osc_grant_shrink_grant_cb, NULL,
907                                        &client->cl_grant_shrink_list);
908         if (rc) {
909                 CERROR("add grant client %s error %d\n",
910                         client->cl_import->imp_obd->obd_name, rc);
911                 return rc;
912         }
913         CDEBUG(D_CACHE, "add grant client %s \n",
914                client->cl_import->imp_obd->obd_name);
915         osc_update_next_shrink(client);
916         return 0;
917 }
918
919 static int osc_del_shrink_grant(struct client_obd *client)
920 {
921         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
922                                          TIMEOUT_GRANT);
923 }
924
925 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
926 {
927         /*
928          * ocd_grant is the total grant amount we're expect to hold: if we've
929          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
930          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
931          * dirty.
932          *
933          * race is tolerable here: if we're evicted, but imp_state already
934          * left EVICTED state, then cl_dirty_pages must be 0 already.
935          */
936         client_obd_list_lock(&cli->cl_loi_list_lock);
937         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
938                 cli->cl_avail_grant = ocd->ocd_grant;
939         else
940                 cli->cl_avail_grant = ocd->ocd_grant -
941                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
942
943         if (cli->cl_avail_grant < 0) {
944                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
945                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
946                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
947                 /* workaround for servers which do not have the patch from
948                  * LU-2679 */
949                 cli->cl_avail_grant = ocd->ocd_grant;
950         }
951
952         /* determine the appropriate chunk size used by osc_extent. */
953         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
954         client_obd_list_unlock(&cli->cl_loi_list_lock);
955
956         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
957                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
958                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, obd_count page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~CFS_PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            obd_count page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1061                                    struct brw_page **pga, int opc,
1062                                    cksum_type_t cksum_type)
1063 {
1064         __u32                           cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         int                             err;
1069         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1070
1071         LASSERT(pg_count > 0);
1072
1073         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1074         if (IS_ERR(hdesc)) {
1075                 CERROR("Unable to initialize checksum hash %s\n",
1076                        cfs_crypto_hash_name(cfs_alg));
1077                 return PTR_ERR(hdesc);
1078         }
1079
1080         while (nob > 0 && pg_count > 0) {
1081                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1082
1083                 /* corrupt the data before we compute the checksum, to
1084                  * simulate an OST->client data error */
1085                 if (i == 0 && opc == OST_READ &&
1086                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1087                         unsigned char *ptr = kmap(pga[i]->pg);
1088                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1089
1090                         memcpy(ptr + off, "bad1", min(4, nob));
1091                         kunmap(pga[i]->pg);
1092                 }
1093                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1094                                             pga[i]->off & ~CFS_PAGE_MASK,
1095                                             count);
1096                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1097                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1098
1099                 nob -= pga[i]->count;
1100                 pg_count--;
1101                 i++;
1102         }
1103
1104         bufsize = sizeof(cksum);
1105         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1106
1107         /* For sending we only compute the wrong checksum instead
1108          * of corrupting the data so it is still correct on a redo */
1109         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1110                 cksum++;
1111
1112         return cksum;
1113 }
1114
1115 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1116                                 struct lov_stripe_md *lsm, obd_count page_count,
1117                                 struct brw_page **pga,
1118                                 struct ptlrpc_request **reqp,
1119                                 struct obd_capa *ocapa, int reserve,
1120                                 int resend)
1121 {
1122         struct ptlrpc_request   *req;
1123         struct ptlrpc_bulk_desc *desc;
1124         struct ost_body         *body;
1125         struct obd_ioobj        *ioobj;
1126         struct niobuf_remote    *niobuf;
1127         int niocount, i, requested_nob, opc, rc;
1128         struct osc_brw_async_args *aa;
1129         struct req_capsule      *pill;
1130         struct brw_page *pg_prev;
1131
1132         ENTRY;
1133         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1134                 RETURN(-ENOMEM); /* Recoverable */
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1136                 RETURN(-EINVAL); /* Fatal */
1137
1138         if ((cmd & OBD_BRW_WRITE) != 0) {
1139                 opc = OST_WRITE;
1140                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1141                                                 cli->cl_import->imp_rq_pool,
1142                                                 &RQF_OST_BRW_WRITE);
1143         } else {
1144                 opc = OST_READ;
1145                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1146         }
1147         if (req == NULL)
1148                 RETURN(-ENOMEM);
1149
1150         for (niocount = i = 1; i < page_count; i++) {
1151                 if (!can_merge_pages(pga[i - 1], pga[i]))
1152                         niocount++;
1153         }
1154
1155         pill = &req->rq_pill;
1156         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1157                              sizeof(*ioobj));
1158         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1159                              niocount * sizeof(*niobuf));
1160         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1161
1162         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1163         if (rc) {
1164                 ptlrpc_request_free(req);
1165                 RETURN(rc);
1166         }
1167         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1168         ptlrpc_at_set_req_timeout(req);
1169         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1170          * retry logic */
1171         req->rq_no_retry_einprogress = 1;
1172
1173         desc = ptlrpc_prep_bulk_imp(req, page_count,
1174                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1175                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1176                 OST_BULK_PORTAL);
1177
1178         if (desc == NULL)
1179                 GOTO(out, rc = -ENOMEM);
1180         /* NB request now owns desc and will free it when it gets freed */
1181
1182         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1183         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1184         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1185         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1186
1187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1188
1189         obdo_to_ioobj(oa, ioobj);
1190         ioobj->ioo_bufcnt = niocount;
1191         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1192          * that might be send for this request.  The actual number is decided
1193          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1194          * "max - 1" for old client compatibility sending "0", and also so the
1195          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1196         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1197         osc_pack_capa(req, body, ocapa);
1198         LASSERT(page_count > 0);
1199         pg_prev = pga[0];
1200         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1201                 struct brw_page *pg = pga[i];
1202                 int poff = pg->off & ~CFS_PAGE_MASK;
1203
1204                 LASSERT(pg->count > 0);
1205                 /* make sure there is no gap in the middle of page array */
1206                 LASSERTF(page_count == 1 ||
1207                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1208                           ergo(i > 0 && i < page_count - 1,
1209                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1210                           ergo(i == page_count - 1, poff == 0)),
1211                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1212                          i, page_count, pg, pg->off, pg->count);
1213                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1216                          i, page_count,
1217                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218                          pg_prev->pg, page_private(pg_prev->pg),
1219                          pg_prev->pg->index, pg_prev->off);
1220                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1221                         (pg->flag & OBD_BRW_SRVLOCK));
1222
1223                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1224                 requested_nob += pg->count;
1225
1226                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1227                         niobuf--;
1228                         niobuf->rnb_len += pg->count;
1229                 } else {
1230                         niobuf->rnb_offset = pg->off;
1231                         niobuf->rnb_len    = pg->count;
1232                         niobuf->rnb_flags  = pg->flag;
1233                 }
1234                 pg_prev = pg;
1235         }
1236
1237         LASSERTF((void *)(niobuf - niocount) ==
1238                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1239                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1240                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1241
1242         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1243         if (resend) {
1244                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1245                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1246                         body->oa.o_flags = 0;
1247                 }
1248                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1249         }
1250
1251         if (osc_should_shrink_grant(cli))
1252                 osc_shrink_grant_local(cli, &body->oa);
1253
1254         /* size[REQ_REC_OFF] still sizeof (*body) */
1255         if (opc == OST_WRITE) {
1256                 if (cli->cl_checksum &&
1257                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1258                         /* store cl_cksum_type in a local variable since
1259                          * it can be changed via lprocfs */
1260                         cksum_type_t cksum_type = cli->cl_cksum_type;
1261
1262                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1263                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1264                                 body->oa.o_flags = 0;
1265                         }
1266                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1267                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1268                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1269                                                              page_count, pga,
1270                                                              OST_WRITE,
1271                                                              cksum_type);
1272                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1273                                body->oa.o_cksum);
1274                         /* save this in 'oa', too, for later checking */
1275                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         oa->o_flags |= cksum_type_pack(cksum_type);
1277                 } else {
1278                         /* clear out the checksum flag, in case this is a
1279                          * resend but cl_checksum is no longer set. b=11238 */
1280                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1281                 }
1282                 oa->o_cksum = body->oa.o_cksum;
1283                 /* 1 RC per niobuf */
1284                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1285                                      sizeof(__u32) * niocount);
1286         } else {
1287                 if (cli->cl_checksum &&
1288                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1289                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1290                                 body->oa.o_flags = 0;
1291                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1292                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1293                 }
1294         }
1295         ptlrpc_request_set_replen(req);
1296
1297         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298         aa = ptlrpc_req_async_args(req);
1299         aa->aa_oa = oa;
1300         aa->aa_requested_nob = requested_nob;
1301         aa->aa_nio_count = niocount;
1302         aa->aa_page_count = page_count;
1303         aa->aa_resends = 0;
1304         aa->aa_ppga = pga;
1305         aa->aa_cli = cli;
1306         INIT_LIST_HEAD(&aa->aa_oaps);
1307         if (ocapa && reserve)
1308                 aa->aa_ocapa = capa_get(ocapa);
1309
1310         *reqp = req;
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1315         RETURN(0);
1316
1317  out:
1318         ptlrpc_req_finished(req);
1319         RETURN(rc);
1320 }
1321
1322 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1323                                 __u32 client_cksum, __u32 server_cksum, int nob,
1324                                 obd_count page_count, struct brw_page **pga,
1325                                 cksum_type_t client_cksum_type)
1326 {
1327         __u32 new_cksum;
1328         char *msg;
1329         cksum_type_t cksum_type;
1330
1331         if (server_cksum == client_cksum) {
1332                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1333                 return 0;
1334         }
1335
1336         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1337                                        oa->o_flags : 0);
1338         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1339                                       cksum_type);
1340
1341         if (cksum_type != client_cksum_type)
1342                 msg = "the server did not use the checksum type specified in "
1343                       "the original request - likely a protocol problem";
1344         else if (new_cksum == server_cksum)
1345                 msg = "changed on the client after we checksummed it - "
1346                       "likely false positive due to mmap IO (bug 11742)";
1347         else if (new_cksum == client_cksum)
1348                 msg = "changed in transit before arrival at OST";
1349         else
1350                 msg = "changed in transit AND doesn't match the original - "
1351                       "likely false positive due to mmap IO (bug 11742)";
1352
1353         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1354                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1355                            msg, libcfs_nid2str(peer->nid),
1356                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1359                            POSTID(&oa->o_oi), pga[0]->off,
1360                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362                "client csum now %x\n", client_cksum, client_cksum_type,
1363                server_cksum, cksum_type, new_cksum);
1364         return 1;
1365 }
1366
1367 /* Note rc enters this function as number of bytes transferred */
1368 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1369 {
1370         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1371         const lnet_process_id_t *peer =
1372                         &req->rq_import->imp_connection->c_peer;
1373         struct client_obd *cli = aa->aa_cli;
1374         struct ost_body *body;
1375         __u32 client_cksum = 0;
1376         ENTRY;
1377
1378         if (rc < 0 && rc != -EDQUOT) {
1379                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1380                 RETURN(rc);
1381         }
1382
1383         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1384         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1385         if (body == NULL) {
1386                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1387                 RETURN(-EPROTO);
1388         }
1389
1390         /* set/clear over quota flag for a uid/gid */
1391         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1393                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1394
1395                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1396                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1397                        body->oa.o_flags);
1398                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1399         }
1400
1401         osc_update_grant(cli, body);
1402
1403         if (rc < 0)
1404                 RETURN(rc);
1405
1406         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1407                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1408
1409         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1410                 if (rc > 0) {
1411                         CERROR("Unexpected +ve rc %d\n", rc);
1412                         RETURN(-EPROTO);
1413                 }
1414                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1415
1416                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1417                         RETURN(-EAGAIN);
1418
1419                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1420                     check_write_checksum(&body->oa, peer, client_cksum,
1421                                          body->oa.o_cksum, aa->aa_requested_nob,
1422                                          aa->aa_page_count, aa->aa_ppga,
1423                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1424                         RETURN(-EAGAIN);
1425
1426                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1427                                      aa->aa_page_count, aa->aa_ppga);
1428                 GOTO(out, rc);
1429         }
1430
1431         /* The rest of this function executes only for OST_READs */
1432
1433         /* if unwrap_bulk failed, return -EAGAIN to retry */
1434         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1435         if (rc < 0)
1436                 GOTO(out, rc = -EAGAIN);
1437
1438         if (rc > aa->aa_requested_nob) {
1439                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1440                        aa->aa_requested_nob);
1441                 RETURN(-EPROTO);
1442         }
1443
1444         if (rc != req->rq_bulk->bd_nob_transferred) {
1445                 CERROR ("Unexpected rc %d (%d transferred)\n",
1446                         rc, req->rq_bulk->bd_nob_transferred);
1447                 return (-EPROTO);
1448         }
1449
1450         if (rc < aa->aa_requested_nob)
1451                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1452
1453         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1454                 static int cksum_counter;
1455                 __u32      server_cksum = body->oa.o_cksum;
1456                 char      *via;
1457                 char      *router;
1458                 cksum_type_t cksum_type;
1459
1460                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1461                                                body->oa.o_flags : 0);
1462                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1463                                                  aa->aa_ppga, OST_READ,
1464                                                  cksum_type);
1465
1466                 if (peer->nid == req->rq_bulk->bd_sender) {
1467                         via = router = "";
1468                 } else {
1469                         via = " via ";
1470                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1471                 }
1472
1473                 if (server_cksum != client_cksum) {
1474                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475                                            "%s%s%s inode "DFID" object "DOSTID
1476                                            " extent ["LPU64"-"LPU64"]\n",
1477                                            req->rq_import->imp_obd->obd_name,
1478                                            libcfs_nid2str(peer->nid),
1479                                            via, router,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_seq : (__u64)0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_oid : 0,
1484                                            body->oa.o_valid & OBD_MD_FLFID ?
1485                                                 body->oa.o_parent_ver : 0,
1486                                            POSTID(&body->oa.o_oi),
1487                                            aa->aa_ppga[0]->off,
1488                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1489                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1490                                                                         1);
1491                         CERROR("client %x, server %x, cksum_type %x\n",
1492                                client_cksum, server_cksum, cksum_type);
1493                         cksum_counter = 0;
1494                         aa->aa_oa->o_cksum = client_cksum;
1495                         rc = -EAGAIN;
1496                 } else {
1497                         cksum_counter++;
1498                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1499                         rc = 0;
1500                 }
1501         } else if (unlikely(client_cksum)) {
1502                 static int cksum_missed;
1503
1504                 cksum_missed++;
1505                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506                         CERROR("Checksum %u requested from %s but not sent\n",
1507                                cksum_missed, libcfs_nid2str(peer->nid));
1508         } else {
1509                 rc = 0;
1510         }
1511 out:
1512         if (rc >= 0)
1513                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514                                      aa->aa_oa, &body->oa);
1515
1516         RETURN(rc);
1517 }
1518
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520                                 struct osc_brw_async_args *aa, int rc)
1521 {
1522         struct ptlrpc_request *new_req;
1523         struct osc_brw_async_args *new_aa;
1524         struct osc_async_page *oap;
1525         ENTRY;
1526
1527         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528                   "redo for recoverable error %d", rc);
1529
1530         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1532                                   aa->aa_cli, aa->aa_oa,
1533                                   NULL /* lsm unused by osc currently */,
1534                                   aa->aa_page_count, aa->aa_ppga,
1535                                   &new_req, aa->aa_ocapa, 0, 1);
1536         if (rc)
1537                 RETURN(rc);
1538
1539         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1540                 if (oap->oap_request != NULL) {
1541                         LASSERTF(request == oap->oap_request,
1542                                  "request %p != oap_request %p\n",
1543                                  request, oap->oap_request);
1544                         if (oap->oap_interrupted) {
1545                                 ptlrpc_req_finished(new_req);
1546                                 RETURN(-EINTR);
1547                         }
1548                 }
1549         }
1550         /* New request takes over pga and oaps from old request.
1551          * Note that copying a list_head doesn't work, need to move it... */
1552         aa->aa_resends++;
1553         new_req->rq_interpret_reply = request->rq_interpret_reply;
1554         new_req->rq_async_args = request->rq_async_args;
1555         new_req->rq_commit_cb = request->rq_commit_cb;
1556         /* cap resend delay to the current request timeout, this is similar to
1557          * what ptlrpc does (see after_reply()) */
1558         if (aa->aa_resends > new_req->rq_timeout)
1559                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1560         else
1561                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1562         new_req->rq_generation_set = 1;
1563         new_req->rq_import_generation = request->rq_import_generation;
1564
1565         new_aa = ptlrpc_req_async_args(new_req);
1566
1567         INIT_LIST_HEAD(&new_aa->aa_oaps);
1568         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1569         INIT_LIST_HEAD(&new_aa->aa_exts);
1570         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1571         new_aa->aa_resends = aa->aa_resends;
1572
1573         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1574                 if (oap->oap_request) {
1575                         ptlrpc_req_finished(oap->oap_request);
1576                         oap->oap_request = ptlrpc_request_addref(new_req);
1577                 }
1578         }
1579
1580         new_aa->aa_ocapa = aa->aa_ocapa;
1581         aa->aa_ocapa = NULL;
1582
1583         /* XXX: This code will run into problem if we're going to support
1584          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1585          * and wait for all of them to be finished. We should inherit request
1586          * set from old request. */
1587         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1588
1589         DEBUG_REQ(D_INFO, new_req, "new request");
1590         RETURN(0);
1591 }
1592
1593 /*
1594  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1595  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1596  * fine for our small page arrays and doesn't require allocation.  its an
1597  * insertion sort that swaps elements that are strides apart, shrinking the
1598  * stride down until its '1' and the array is sorted.
1599  */
1600 static void sort_brw_pages(struct brw_page **array, int num)
1601 {
1602         int stride, i, j;
1603         struct brw_page *tmp;
1604
1605         if (num == 1)
1606                 return;
1607         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1608                 ;
1609
1610         do {
1611                 stride /= 3;
1612                 for (i = stride ; i < num ; i++) {
1613                         tmp = array[i];
1614                         j = i;
1615                         while (j >= stride && array[j - stride]->off > tmp->off) {
1616                                 array[j] = array[j - stride];
1617                                 j -= stride;
1618                         }
1619                         array[j] = tmp;
1620                 }
1621         } while (stride > 1);
1622 }
1623
1624 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1625 {
1626         LASSERT(ppga != NULL);
1627         OBD_FREE(ppga, sizeof(*ppga) * count);
1628 }
1629
1630 static int brw_interpret(const struct lu_env *env,
1631                          struct ptlrpc_request *req, void *data, int rc)
1632 {
1633         struct osc_brw_async_args *aa = data;
1634         struct osc_extent *ext;
1635         struct osc_extent *tmp;
1636         struct client_obd *cli = aa->aa_cli;
1637         ENTRY;
1638
1639         rc = osc_brw_fini_request(req, rc);
1640         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1641         /* When server return -EINPROGRESS, client should always retry
1642          * regardless of the number of times the bulk was resent already. */
1643         if (osc_recoverable_error(rc)) {
1644                 if (req->rq_import_generation !=
1645                     req->rq_import->imp_generation) {
1646                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1647                                ""DOSTID", rc = %d.\n",
1648                                req->rq_import->imp_obd->obd_name,
1649                                POSTID(&aa->aa_oa->o_oi), rc);
1650                 } else if (rc == -EINPROGRESS ||
1651                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1652                         rc = osc_brw_redo_request(req, aa, rc);
1653                 } else {
1654                         CERROR("%s: too many resent retries for object: "
1655                                ""LPU64":"LPU64", rc = %d.\n",
1656                                req->rq_import->imp_obd->obd_name,
1657                                POSTID(&aa->aa_oa->o_oi), rc);
1658                 }
1659
1660                 if (rc == 0)
1661                         RETURN(0);
1662                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1663                         rc = -EIO;
1664         }
1665
1666         if (aa->aa_ocapa) {
1667                 capa_put(aa->aa_ocapa);
1668                 aa->aa_ocapa = NULL;
1669         }
1670
1671         if (rc == 0) {
1672                 struct obdo *oa = aa->aa_oa;
1673                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1674                 unsigned long valid = 0;
1675                 struct cl_object *obj;
1676                 struct osc_async_page *last;
1677
1678                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1679                 obj = osc2cl(last->oap_obj);
1680
1681                 cl_object_attr_lock(obj);
1682                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1683                         attr->cat_blocks = oa->o_blocks;
1684                         valid |= CAT_BLOCKS;
1685                 }
1686                 if (oa->o_valid & OBD_MD_FLMTIME) {
1687                         attr->cat_mtime = oa->o_mtime;
1688                         valid |= CAT_MTIME;
1689                 }
1690                 if (oa->o_valid & OBD_MD_FLATIME) {
1691                         attr->cat_atime = oa->o_atime;
1692                         valid |= CAT_ATIME;
1693                 }
1694                 if (oa->o_valid & OBD_MD_FLCTIME) {
1695                         attr->cat_ctime = oa->o_ctime;
1696                         valid |= CAT_CTIME;
1697                 }
1698
1699                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1700                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1701                         loff_t last_off = last->oap_count + last->oap_obj_off +
1702                                 last->oap_page_off;
1703
1704                         /* Change file size if this is an out of quota or
1705                          * direct IO write and it extends the file size */
1706                         if (loi->loi_lvb.lvb_size < last_off) {
1707                                 attr->cat_size = last_off;
1708                                 valid |= CAT_SIZE;
1709                         }
1710                         /* Extend KMS if it's not a lockless write */
1711                         if (loi->loi_kms < last_off &&
1712                             oap2osc_page(last)->ops_srvlock == 0) {
1713                                 attr->cat_kms = last_off;
1714                                 valid |= CAT_KMS;
1715                         }
1716                 }
1717
1718                 if (valid != 0)
1719                         cl_object_attr_set(env, obj, attr, valid);
1720                 cl_object_attr_unlock(obj);
1721         }
1722         OBDO_FREE(aa->aa_oa);
1723
1724         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1725                 osc_inc_unstable_pages(req);
1726
1727         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1728                 list_del_init(&ext->oe_link);
1729                 osc_extent_finish(env, ext, 1, rc);
1730         }
1731         LASSERT(list_empty(&aa->aa_exts));
1732         LASSERT(list_empty(&aa->aa_oaps));
1733
1734         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1735                           req->rq_bulk->bd_nob_transferred);
1736         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1737         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1738
1739         client_obd_list_lock(&cli->cl_loi_list_lock);
1740         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1741          * is called so we know whether to go to sync BRWs or wait for more
1742          * RPCs to complete */
1743         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1744                 cli->cl_w_in_flight--;
1745         else
1746                 cli->cl_r_in_flight--;
1747         osc_wake_cache_waiters(cli);
1748         client_obd_list_unlock(&cli->cl_loi_list_lock);
1749
1750         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1751         RETURN(rc);
1752 }
1753
1754 static void brw_commit(struct ptlrpc_request *req)
1755 {
1756         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1757          * this called via the rq_commit_cb, I need to ensure
1758          * osc_dec_unstable_pages is still called. Otherwise unstable
1759          * pages may be leaked. */
1760         spin_lock(&req->rq_lock);
1761         if (likely(req->rq_unstable)) {
1762                 req->rq_unstable = 0;
1763                 spin_unlock(&req->rq_lock);
1764
1765                 osc_dec_unstable_pages(req);
1766         } else {
1767                 req->rq_committed = 1;
1768                 spin_unlock(&req->rq_lock);
1769         }
1770 }
1771
1772 /**
1773  * Build an RPC by the list of extent @ext_list. The caller must ensure
1774  * that the total pages in this list are NOT over max pages per RPC.
1775  * Extents in the list must be in OES_RPC state.
1776  */
1777 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1778                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1779 {
1780         struct ptlrpc_request           *req = NULL;
1781         struct osc_extent               *ext;
1782         struct brw_page                 **pga = NULL;
1783         struct osc_brw_async_args       *aa = NULL;
1784         struct obdo                     *oa = NULL;
1785         struct osc_async_page           *oap;
1786         struct osc_async_page           *tmp;
1787         struct cl_req                   *clerq = NULL;
1788         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1789                                                                       CRT_READ;
1790         struct cl_req_attr              *crattr = NULL;
1791         obd_off                         starting_offset = OBD_OBJECT_EOF;
1792         obd_off                         ending_offset = 0;
1793         int                             mpflag = 0;
1794         int                             mem_tight = 0;
1795         int                             page_count = 0;
1796         bool                            soft_sync = false;
1797         int                             i;
1798         int                             rc;
1799         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1800
1801         ENTRY;
1802         LASSERT(!list_empty(ext_list));
1803
1804         /* add pages into rpc_list to build BRW rpc */
1805         list_for_each_entry(ext, ext_list, oe_link) {
1806                 LASSERT(ext->oe_state == OES_RPC);
1807                 mem_tight |= ext->oe_memalloc;
1808                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1809                         ++page_count;
1810                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1811                         if (starting_offset > oap->oap_obj_off)
1812                                 starting_offset = oap->oap_obj_off;
1813                         else
1814                                 LASSERT(oap->oap_page_off == 0);
1815                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1816                                 ending_offset = oap->oap_obj_off +
1817                                                 oap->oap_count;
1818                         else
1819                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1820                                         PAGE_CACHE_SIZE);
1821                 }
1822         }
1823
1824         soft_sync = osc_over_unstable_soft_limit(cli);
1825         if (mem_tight)
1826                 mpflag = cfs_memory_pressure_get_and_set();
1827
1828         OBD_ALLOC(crattr, sizeof(*crattr));
1829         if (crattr == NULL)
1830                 GOTO(out, rc = -ENOMEM);
1831
1832         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1833         if (pga == NULL)
1834                 GOTO(out, rc = -ENOMEM);
1835
1836         OBDO_ALLOC(oa);
1837         if (oa == NULL)
1838                 GOTO(out, rc = -ENOMEM);
1839
1840         i = 0;
1841         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1842                 struct cl_page *page = oap2cl_page(oap);
1843                 if (clerq == NULL) {
1844                         clerq = cl_req_alloc(env, page, crt,
1845                                              1 /* only 1-object rpcs for now */);
1846                         if (IS_ERR(clerq))
1847                                 GOTO(out, rc = PTR_ERR(clerq));
1848                 }
1849                 if (mem_tight)
1850                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1851                 if (soft_sync)
1852                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1853                 pga[i] = &oap->oap_brw_page;
1854                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1855                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1856                        pga[i]->pg, page_index(oap->oap_page), oap,
1857                        pga[i]->flag);
1858                 i++;
1859                 cl_req_page_add(env, clerq, page);
1860         }
1861
1862         /* always get the data for the obdo for the rpc */
1863         LASSERT(clerq != NULL);
1864         crattr->cra_oa = oa;
1865         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1866
1867         rc = cl_req_prep(env, clerq);
1868         if (rc != 0) {
1869                 CERROR("cl_req_prep failed: %d\n", rc);
1870                 GOTO(out, rc);
1871         }
1872
1873         sort_brw_pages(pga, page_count);
1874         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1875                         pga, &req, crattr->cra_capa, 1, 0);
1876         if (rc != 0) {
1877                 CERROR("prep_req failed: %d\n", rc);
1878                 GOTO(out, rc);
1879         }
1880
1881         req->rq_commit_cb = brw_commit;
1882         req->rq_interpret_reply = brw_interpret;
1883
1884         if (mem_tight != 0)
1885                 req->rq_memalloc = 1;
1886
1887         /* Need to update the timestamps after the request is built in case
1888          * we race with setattr (locally or in queue at OST).  If OST gets
1889          * later setattr before earlier BRW (as determined by the request xid),
1890          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1891          * way to do this in a single call.  bug 10150 */
1892         cl_req_attr_set(env, clerq, crattr,
1893                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1894
1895         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1896
1897         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1898         aa = ptlrpc_req_async_args(req);
1899         INIT_LIST_HEAD(&aa->aa_oaps);
1900         list_splice_init(&rpc_list, &aa->aa_oaps);
1901         INIT_LIST_HEAD(&aa->aa_exts);
1902         list_splice_init(ext_list, &aa->aa_exts);
1903         aa->aa_clerq = clerq;
1904
1905         /* queued sync pages can be torn down while the pages
1906          * were between the pending list and the rpc */
1907         tmp = NULL;
1908         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1909                 /* only one oap gets a request reference */
1910                 if (tmp == NULL)
1911                         tmp = oap;
1912                 if (oap->oap_interrupted && !req->rq_intr) {
1913                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1914                                         oap, req);
1915                         ptlrpc_mark_interrupted(req);
1916                 }
1917         }
1918         if (tmp != NULL)
1919                 tmp->oap_request = ptlrpc_request_addref(req);
1920
1921         client_obd_list_lock(&cli->cl_loi_list_lock);
1922         starting_offset >>= PAGE_CACHE_SHIFT;
1923         if (cmd == OBD_BRW_READ) {
1924                 cli->cl_r_in_flight++;
1925                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1926                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1927                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1928                                       starting_offset + 1);
1929         } else {
1930                 cli->cl_w_in_flight++;
1931                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1932                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1933                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1934                                       starting_offset + 1);
1935         }
1936         client_obd_list_unlock(&cli->cl_loi_list_lock);
1937
1938         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1939                   page_count, aa, cli->cl_r_in_flight,
1940                   cli->cl_w_in_flight);
1941
1942         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1943          * see which CPU/NUMA node the majority of pages were allocated
1944          * on, and try to assign the async RPC to the CPU core
1945          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1946          *
1947          * But on the other hand, we expect that multiple ptlrpcd
1948          * threads and the initial write sponsor can run in parallel,
1949          * especially when data checksum is enabled, which is CPU-bound
1950          * operation and single ptlrpcd thread cannot process in time.
1951          * So more ptlrpcd threads sharing BRW load
1952          * (with PDL_POLICY_ROUND) seems better.
1953          */
1954         ptlrpcd_add_req(req, pol, -1);
1955         rc = 0;
1956         EXIT;
1957
1958 out:
1959         if (mem_tight != 0)
1960                 cfs_memory_pressure_restore(mpflag);
1961
1962         if (crattr != NULL) {
1963                 capa_put(crattr->cra_capa);
1964                 OBD_FREE(crattr, sizeof(*crattr));
1965         }
1966
1967         if (rc != 0) {
1968                 LASSERT(req == NULL);
1969
1970                 if (oa)
1971                         OBDO_FREE(oa);
1972                 if (pga)
1973                         OBD_FREE(pga, sizeof(*pga) * page_count);
1974                 /* this should happen rarely and is pretty bad, it makes the
1975                  * pending list not follow the dirty order */
1976                 while (!list_empty(ext_list)) {
1977                         ext = list_entry(ext_list->next, struct osc_extent,
1978                                          oe_link);
1979                         list_del_init(&ext->oe_link);
1980                         osc_extent_finish(env, ext, 0, rc);
1981                 }
1982                 if (clerq && !IS_ERR(clerq))
1983                         cl_req_completion(env, clerq, rc);
1984         }
1985         RETURN(rc);
1986 }
1987
1988 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1989                                         struct ldlm_enqueue_info *einfo)
1990 {
1991         void *data = einfo->ei_cbdata;
1992         int set = 0;
1993
1994         LASSERT(lock != NULL);
1995         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1996         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1997         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1998         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1999
2000         lock_res_and_lock(lock);
2001
2002         if (lock->l_ast_data == NULL)
2003                 lock->l_ast_data = data;
2004         if (lock->l_ast_data == data)
2005                 set = 1;
2006
2007         unlock_res_and_lock(lock);
2008
2009         return set;
2010 }
2011
2012 static int osc_set_data_with_check(struct lustre_handle *lockh,
2013                                    struct ldlm_enqueue_info *einfo)
2014 {
2015         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2016         int set = 0;
2017
2018         if (lock != NULL) {
2019                 set = osc_set_lock_data_with_check(lock, einfo);
2020                 LDLM_LOCK_PUT(lock);
2021         } else
2022                 CERROR("lockh %p, data %p - client evicted?\n",
2023                        lockh, einfo->ei_cbdata);
2024         return set;
2025 }
2026
2027 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2028                              ldlm_iterator_t replace, void *data)
2029 {
2030         struct ldlm_res_id res_id;
2031         struct obd_device *obd = class_exp2obd(exp);
2032
2033         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2034         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2035         return 0;
2036 }
2037
2038 /* find any ldlm lock of the inode in osc
2039  * return 0    not find
2040  *        1    find one
2041  *      < 0    error */
2042 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2043                            ldlm_iterator_t replace, void *data)
2044 {
2045         struct ldlm_res_id res_id;
2046         struct obd_device *obd = class_exp2obd(exp);
2047         int rc = 0;
2048
2049         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2050         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2051         if (rc == LDLM_ITER_STOP)
2052                 return(1);
2053         if (rc == LDLM_ITER_CONTINUE)
2054                 return(0);
2055         return(rc);
2056 }
2057
2058 static int osc_enqueue_fini(struct ptlrpc_request *req,
2059                             osc_enqueue_upcall_f upcall, void *cookie,
2060                             struct lustre_handle *lockh, ldlm_mode_t mode,
2061                             __u64 *flags, int agl, int errcode)
2062 {
2063         bool intent = *flags & LDLM_FL_HAS_INTENT;
2064         int rc;
2065         ENTRY;
2066
2067         /* The request was created before ldlm_cli_enqueue call. */
2068         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2069                 struct ldlm_reply *rep;
2070
2071                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2072                 LASSERT(rep != NULL);
2073
2074                 rep->lock_policy_res1 =
2075                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2076                 if (rep->lock_policy_res1)
2077                         errcode = rep->lock_policy_res1;
2078                 if (!agl)
2079                         *flags |= LDLM_FL_LVB_READY;
2080         } else if (errcode == ELDLM_OK) {
2081                 *flags |= LDLM_FL_LVB_READY;
2082         }
2083
2084         /* Call the update callback. */
2085         rc = (*upcall)(cookie, lockh, errcode);
2086
2087         /* release the reference taken in ldlm_cli_enqueue() */
2088         if (errcode == ELDLM_LOCK_MATCHED)
2089                 errcode = ELDLM_OK;
2090         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2091                 ldlm_lock_decref(lockh, mode);
2092
2093         RETURN(rc);
2094 }
2095
2096 static int osc_enqueue_interpret(const struct lu_env *env,
2097                                  struct ptlrpc_request *req,
2098                                  struct osc_enqueue_args *aa, int rc)
2099 {
2100         struct ldlm_lock *lock;
2101         struct lustre_handle *lockh = &aa->oa_lockh;
2102         ldlm_mode_t mode = aa->oa_mode;
2103         struct ost_lvb *lvb = aa->oa_lvb;
2104         __u32 lvb_len = sizeof(*lvb);
2105         __u64 flags = 0;
2106
2107         ENTRY;
2108
2109         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2110          * be valid. */
2111         lock = ldlm_handle2lock(lockh);
2112         LASSERTF(lock != NULL,
2113                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2114                  lockh->cookie, req, aa);
2115
2116         /* Take an additional reference so that a blocking AST that
2117          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2118          * to arrive after an upcall has been executed by
2119          * osc_enqueue_fini(). */
2120         ldlm_lock_addref(lockh, mode);
2121
2122         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2123         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2124
2125         /* Let CP AST to grant the lock first. */
2126         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2127
2128         if (aa->oa_agl) {
2129                 LASSERT(aa->oa_lvb == NULL);
2130                 LASSERT(aa->oa_flags == NULL);
2131                 aa->oa_flags = &flags;
2132         }
2133
2134         /* Complete obtaining the lock procedure. */
2135         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2136                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2137                                    lockh, rc);
2138         /* Complete osc stuff. */
2139         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2140                               aa->oa_flags, aa->oa_agl, rc);
2141
2142         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2143
2144         ldlm_lock_decref(lockh, mode);
2145         LDLM_LOCK_PUT(lock);
2146         RETURN(rc);
2147 }
2148
2149 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2150
2151 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2152  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2153  * other synchronous requests, however keeping some locks and trying to obtain
2154  * others may take a considerable amount of time in a case of ost failure; and
2155  * when other sync requests do not get released lock from a client, the client
2156  * is evicted from the cluster -- such scenarious make the life difficult, so
2157  * release locks just after they are obtained. */
2158 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2159                      __u64 *flags, ldlm_policy_data_t *policy,
2160                      struct ost_lvb *lvb, int kms_valid,
2161                      osc_enqueue_upcall_f upcall, void *cookie,
2162                      struct ldlm_enqueue_info *einfo,
2163                      struct ptlrpc_request_set *rqset, int async, int agl)
2164 {
2165         struct obd_device *obd = exp->exp_obd;
2166         struct lustre_handle lockh = { 0 };
2167         struct ptlrpc_request *req = NULL;
2168         int intent = *flags & LDLM_FL_HAS_INTENT;
2169         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2170         ldlm_mode_t mode;
2171         int rc;
2172         ENTRY;
2173
2174         /* Filesystem lock extents are extended to page boundaries so that
2175          * dealing with the page cache is a little smoother.  */
2176         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2177         policy->l_extent.end |= ~CFS_PAGE_MASK;
2178
2179         /*
2180          * kms is not valid when either object is completely fresh (so that no
2181          * locks are cached), or object was evicted. In the latter case cached
2182          * lock cannot be used, because it would prime inode state with
2183          * potentially stale LVB.
2184          */
2185         if (!kms_valid)
2186                 goto no_match;
2187
2188         /* Next, search for already existing extent locks that will cover us */
2189         /* If we're trying to read, we also search for an existing PW lock.  The
2190          * VFS and page cache already protect us locally, so lots of readers/
2191          * writers can share a single PW lock.
2192          *
2193          * There are problems with conversion deadlocks, so instead of
2194          * converting a read lock to a write lock, we'll just enqueue a new
2195          * one.
2196          *
2197          * At some point we should cancel the read lock instead of making them
2198          * send us a blocking callback, but there are problems with canceling
2199          * locks out from other users right now, too. */
2200         mode = einfo->ei_mode;
2201         if (einfo->ei_mode == LCK_PR)
2202                 mode |= LCK_PW;
2203         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2204                                einfo->ei_type, policy, mode, &lockh, 0);
2205         if (mode) {
2206                 struct ldlm_lock *matched;
2207
2208                 if (*flags & LDLM_FL_TEST_LOCK)
2209                         RETURN(ELDLM_OK);
2210
2211                 matched = ldlm_handle2lock(&lockh);
2212                 if (agl) {
2213                         /* AGL enqueues DLM locks speculatively. Therefore if
2214                          * it already exists a DLM lock, it wll just inform the
2215                          * caller to cancel the AGL process for this stripe. */
2216                         ldlm_lock_decref(&lockh, mode);
2217                         LDLM_LOCK_PUT(matched);
2218                         RETURN(-ECANCELED);
2219                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2220                         *flags |= LDLM_FL_LVB_READY;
2221
2222                         /* We already have a lock, and it's referenced. */
2223                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2224
2225                         ldlm_lock_decref(&lockh, mode);
2226                         LDLM_LOCK_PUT(matched);
2227                         RETURN(ELDLM_OK);
2228                 } else {
2229                         ldlm_lock_decref(&lockh, mode);
2230                         LDLM_LOCK_PUT(matched);
2231                 }
2232         }
2233
2234 no_match:
2235         if (*flags & LDLM_FL_TEST_LOCK)
2236                 RETURN(-ENOLCK);
2237
2238         if (intent) {
2239                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2240                                            &RQF_LDLM_ENQUEUE_LVB);
2241                 if (req == NULL)
2242                         RETURN(-ENOMEM);
2243
2244                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2245                 if (rc < 0) {
2246                         ptlrpc_request_free(req);
2247                         RETURN(rc);
2248                 }
2249
2250                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2251                                      sizeof *lvb);
2252                 ptlrpc_request_set_replen(req);
2253         }
2254
2255         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2256         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2257
2258         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2259                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2260         if (async) {
2261                 if (!rc) {
2262                         struct osc_enqueue_args *aa;
2263                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2264                         aa = ptlrpc_req_async_args(req);
2265                         aa->oa_exp    = exp;
2266                         aa->oa_mode   = einfo->ei_mode;
2267                         aa->oa_type   = einfo->ei_type;
2268                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2269                         aa->oa_upcall = upcall;
2270                         aa->oa_cookie = cookie;
2271                         aa->oa_agl    = !!agl;
2272                         if (!agl) {
2273                                 aa->oa_flags  = flags;
2274                                 aa->oa_lvb    = lvb;
2275                         } else {
2276                                 /* AGL is essentially to enqueue an DLM lock
2277                                  * in advance, so we don't care about the
2278                                  * result of AGL enqueue. */
2279                                 aa->oa_lvb    = NULL;
2280                                 aa->oa_flags  = NULL;
2281                         }
2282
2283                         req->rq_interpret_reply =
2284                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2285                         if (rqset == PTLRPCD_SET)
2286                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2287                         else
2288                                 ptlrpc_set_add_req(rqset, req);
2289                 } else if (intent) {
2290                         ptlrpc_req_finished(req);
2291                 }
2292                 RETURN(rc);
2293         }
2294
2295         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2296                               flags, agl, rc);
2297         if (intent)
2298                 ptlrpc_req_finished(req);
2299
2300         RETURN(rc);
2301 }
2302
2303 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2304                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2305                    __u64 *flags, void *data, struct lustre_handle *lockh,
2306                    int unref)
2307 {
2308         struct obd_device *obd = exp->exp_obd;
2309         __u64 lflags = *flags;
2310         ldlm_mode_t rc;
2311         ENTRY;
2312
2313         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2314                 RETURN(-EIO);
2315
2316         /* Filesystem lock extents are extended to page boundaries so that
2317          * dealing with the page cache is a little smoother */
2318         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2319         policy->l_extent.end |= ~CFS_PAGE_MASK;
2320
2321         /* Next, search for already existing extent locks that will cover us */
2322         /* If we're trying to read, we also search for an existing PW lock.  The
2323          * VFS and page cache already protect us locally, so lots of readers/
2324          * writers can share a single PW lock. */
2325         rc = mode;
2326         if (mode == LCK_PR)
2327                 rc |= LCK_PW;
2328         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2329                              res_id, type, policy, rc, lockh, unref);
2330         if (rc) {
2331                 if (data != NULL) {
2332                         if (!osc_set_data_with_check(lockh, data)) {
2333                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2334                                         ldlm_lock_decref(lockh, rc);
2335                                 RETURN(0);
2336                         }
2337                 }
2338                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2339                         ldlm_lock_addref(lockh, LCK_PR);
2340                         ldlm_lock_decref(lockh, LCK_PW);
2341                 }
2342                 RETURN(rc);
2343         }
2344         RETURN(rc);
2345 }
2346
2347 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2348 {
2349         ENTRY;
2350
2351         if (unlikely(mode == LCK_GROUP))
2352                 ldlm_lock_decref_and_cancel(lockh, mode);
2353         else
2354                 ldlm_lock_decref(lockh, mode);
2355
2356         RETURN(0);
2357 }
2358
2359 static int osc_statfs_interpret(const struct lu_env *env,
2360                                 struct ptlrpc_request *req,
2361                                 struct osc_async_args *aa, int rc)
2362 {
2363         struct obd_statfs *msfs;
2364         ENTRY;
2365
2366         if (rc == -EBADR)
2367                 /* The request has in fact never been sent
2368                  * due to issues at a higher level (LOV).
2369                  * Exit immediately since the caller is
2370                  * aware of the problem and takes care
2371                  * of the clean up */
2372                  RETURN(rc);
2373
2374         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2375             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2376                 GOTO(out, rc = 0);
2377
2378         if (rc != 0)
2379                 GOTO(out, rc);
2380
2381         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2382         if (msfs == NULL) {
2383                 GOTO(out, rc = -EPROTO);
2384         }
2385
2386         *aa->aa_oi->oi_osfs = *msfs;
2387 out:
2388         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2389         RETURN(rc);
2390 }
2391
2392 static int osc_statfs_async(struct obd_export *exp,
2393                             struct obd_info *oinfo, __u64 max_age,
2394                             struct ptlrpc_request_set *rqset)
2395 {
2396         struct obd_device     *obd = class_exp2obd(exp);
2397         struct ptlrpc_request *req;
2398         struct osc_async_args *aa;
2399         int                    rc;
2400         ENTRY;
2401
2402         /* We could possibly pass max_age in the request (as an absolute
2403          * timestamp or a "seconds.usec ago") so the target can avoid doing
2404          * extra calls into the filesystem if that isn't necessary (e.g.
2405          * during mount that would help a bit).  Having relative timestamps
2406          * is not so great if request processing is slow, while absolute
2407          * timestamps are not ideal because they need time synchronization. */
2408         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2409         if (req == NULL)
2410                 RETURN(-ENOMEM);
2411
2412         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2413         if (rc) {
2414                 ptlrpc_request_free(req);
2415                 RETURN(rc);
2416         }
2417         ptlrpc_request_set_replen(req);
2418         req->rq_request_portal = OST_CREATE_PORTAL;
2419         ptlrpc_at_set_req_timeout(req);
2420
2421         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2422                 /* procfs requests not want stat in wait for avoid deadlock */
2423                 req->rq_no_resend = 1;
2424                 req->rq_no_delay = 1;
2425         }
2426
2427         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2428         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2429         aa = ptlrpc_req_async_args(req);
2430         aa->aa_oi = oinfo;
2431
2432         ptlrpc_set_add_req(rqset, req);
2433         RETURN(0);
2434 }
2435
2436 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2437                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2438 {
2439         struct obd_device     *obd = class_exp2obd(exp);
2440         struct obd_statfs     *msfs;
2441         struct ptlrpc_request *req;
2442         struct obd_import     *imp = NULL;
2443         int rc;
2444         ENTRY;
2445
2446         /*Since the request might also come from lprocfs, so we need
2447          *sync this with client_disconnect_export Bug15684*/
2448         down_read(&obd->u.cli.cl_sem);
2449         if (obd->u.cli.cl_import)
2450                 imp = class_import_get(obd->u.cli.cl_import);
2451         up_read(&obd->u.cli.cl_sem);
2452         if (!imp)
2453                 RETURN(-ENODEV);
2454
2455         /* We could possibly pass max_age in the request (as an absolute
2456          * timestamp or a "seconds.usec ago") so the target can avoid doing
2457          * extra calls into the filesystem if that isn't necessary (e.g.
2458          * during mount that would help a bit).  Having relative timestamps
2459          * is not so great if request processing is slow, while absolute
2460          * timestamps are not ideal because they need time synchronization. */
2461         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2462
2463         class_import_put(imp);
2464
2465         if (req == NULL)
2466                 RETURN(-ENOMEM);
2467
2468         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2469         if (rc) {
2470                 ptlrpc_request_free(req);
2471                 RETURN(rc);
2472         }
2473         ptlrpc_request_set_replen(req);
2474         req->rq_request_portal = OST_CREATE_PORTAL;
2475         ptlrpc_at_set_req_timeout(req);
2476
2477         if (flags & OBD_STATFS_NODELAY) {
2478                 /* procfs requests not want stat in wait for avoid deadlock */
2479                 req->rq_no_resend = 1;
2480                 req->rq_no_delay = 1;
2481         }
2482
2483         rc = ptlrpc_queue_wait(req);
2484         if (rc)
2485                 GOTO(out, rc);
2486
2487         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2488         if (msfs == NULL) {
2489                 GOTO(out, rc = -EPROTO);
2490         }
2491
2492         *osfs = *msfs;
2493
2494         EXIT;
2495  out:
2496         ptlrpc_req_finished(req);
2497         return rc;
2498 }
2499
2500 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2501                          void *karg, void *uarg)
2502 {
2503         struct obd_device *obd = exp->exp_obd;
2504         struct obd_ioctl_data *data = karg;
2505         int err = 0;
2506         ENTRY;
2507
2508         if (!try_module_get(THIS_MODULE)) {
2509                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2510                        module_name(THIS_MODULE));
2511                 return -EINVAL;
2512         }
2513         switch (cmd) {
2514         case OBD_IOC_CLIENT_RECOVER:
2515                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2516                                             data->ioc_inlbuf1, 0);
2517                 if (err > 0)
2518                         err = 0;
2519                 GOTO(out, err);
2520         case IOC_OSC_SET_ACTIVE:
2521                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2522                                                data->ioc_offset);
2523                 GOTO(out, err);
2524         case OBD_IOC_POLL_QUOTACHECK:
2525                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2526                 GOTO(out, err);
2527         case OBD_IOC_PING_TARGET:
2528                 err = ptlrpc_obd_ping(obd);
2529                 GOTO(out, err);
2530         default:
2531                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2532                        cmd, current_comm());
2533                 GOTO(out, err = -ENOTTY);
2534         }
2535 out:
2536         module_put(THIS_MODULE);
2537         return err;
2538 }
2539
2540 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2541                         obd_count keylen, void *key, __u32 *vallen, void *val,
2542                         struct lov_stripe_md *lsm)
2543 {
2544         ENTRY;
2545         if (!vallen || !val)
2546                 RETURN(-EFAULT);
2547
2548         if (KEY_IS(KEY_FIEMAP)) {
2549                 struct ll_fiemap_info_key *fm_key =
2550                                 (struct ll_fiemap_info_key *)key;
2551                 struct ldlm_res_id       res_id;
2552                 ldlm_policy_data_t       policy;
2553                 struct lustre_handle     lockh;
2554                 ldlm_mode_t              mode = 0;
2555                 struct ptlrpc_request   *req;
2556                 struct ll_user_fiemap   *reply;
2557                 char                    *tmp;
2558                 int                      rc;
2559
2560                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2561                         goto skip_locking;
2562
2563                 policy.l_extent.start = fm_key->fiemap.fm_start &
2564                                                 CFS_PAGE_MASK;
2565
2566                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2567                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2568                         policy.l_extent.end = OBD_OBJECT_EOF;
2569                 else
2570                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2571                                 fm_key->fiemap.fm_length +
2572                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2573
2574                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2575                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2576                                        LDLM_FL_BLOCK_GRANTED |
2577                                        LDLM_FL_LVB_READY,
2578                                        &res_id, LDLM_EXTENT, &policy,
2579                                        LCK_PR | LCK_PW, &lockh, 0);
2580                 if (mode) { /* lock is cached on client */
2581                         if (mode != LCK_PR) {
2582                                 ldlm_lock_addref(&lockh, LCK_PR);
2583                                 ldlm_lock_decref(&lockh, LCK_PW);
2584                         }
2585                 } else { /* no cached lock, needs acquire lock on server side */
2586                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2587                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2588                 }
2589
2590 skip_locking:
2591                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2592                                            &RQF_OST_GET_INFO_FIEMAP);
2593                 if (req == NULL)
2594                         GOTO(drop_lock, rc = -ENOMEM);
2595
2596                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2597                                      RCL_CLIENT, keylen);
2598                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2599                                      RCL_CLIENT, *vallen);
2600                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2601                                      RCL_SERVER, *vallen);
2602
2603                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2604                 if (rc) {
2605                         ptlrpc_request_free(req);
2606                         GOTO(drop_lock, rc);
2607                 }
2608
2609                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2610                 memcpy(tmp, key, keylen);
2611                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2612                 memcpy(tmp, val, *vallen);
2613
2614                 ptlrpc_request_set_replen(req);
2615                 rc = ptlrpc_queue_wait(req);
2616                 if (rc)
2617                         GOTO(fini_req, rc);
2618
2619                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2620                 if (reply == NULL)
2621                         GOTO(fini_req, rc = -EPROTO);
2622
2623                 memcpy(val, reply, *vallen);
2624 fini_req:
2625                 ptlrpc_req_finished(req);
2626 drop_lock:
2627                 if (mode)
2628                         ldlm_lock_decref(&lockh, LCK_PR);
2629                 RETURN(rc);
2630         }
2631
2632         RETURN(-EINVAL);
2633 }
2634
2635 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2636                               obd_count keylen, void *key, obd_count vallen,
2637                               void *val, struct ptlrpc_request_set *set)
2638 {
2639         struct ptlrpc_request *req;
2640         struct obd_device     *obd = exp->exp_obd;
2641         struct obd_import     *imp = class_exp2cliimp(exp);
2642         char                  *tmp;
2643         int                    rc;
2644         ENTRY;
2645
2646         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2647
2648         if (KEY_IS(KEY_CHECKSUM)) {
2649                 if (vallen != sizeof(int))
2650                         RETURN(-EINVAL);
2651                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2652                 RETURN(0);
2653         }
2654
2655         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2656                 sptlrpc_conf_client_adapt(obd);
2657                 RETURN(0);
2658         }
2659
2660         if (KEY_IS(KEY_FLUSH_CTX)) {
2661                 sptlrpc_import_flush_my_ctx(imp);
2662                 RETURN(0);
2663         }
2664
2665         if (KEY_IS(KEY_CACHE_SET)) {
2666                 struct client_obd *cli = &obd->u.cli;
2667
2668                 LASSERT(cli->cl_cache == NULL); /* only once */
2669                 cli->cl_cache = (struct cl_client_cache *)val;
2670                 atomic_inc(&cli->cl_cache->ccc_users);
2671                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2672
2673                 /* add this osc into entity list */
2674                 LASSERT(list_empty(&cli->cl_lru_osc));
2675                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2676                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2677                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2678
2679                 RETURN(0);
2680         }
2681
2682         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2683                 struct client_obd *cli = &obd->u.cli;
2684                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2685                 long target = *(long *)val;
2686
2687                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2688                 *(long *)val -= nr;
2689                 RETURN(0);
2690         }
2691
2692         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2693                 RETURN(-EINVAL);
2694
2695         /* We pass all other commands directly to OST. Since nobody calls osc
2696            methods directly and everybody is supposed to go through LOV, we
2697            assume lov checked invalid values for us.
2698            The only recognised values so far are evict_by_nid and mds_conn.
2699            Even if something bad goes through, we'd get a -EINVAL from OST
2700            anyway. */
2701
2702         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2703                                                 &RQF_OST_SET_GRANT_INFO :
2704                                                 &RQF_OBD_SET_INFO);
2705         if (req == NULL)
2706                 RETURN(-ENOMEM);
2707
2708         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2709                              RCL_CLIENT, keylen);
2710         if (!KEY_IS(KEY_GRANT_SHRINK))
2711                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2712                                      RCL_CLIENT, vallen);
2713         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2714         if (rc) {
2715                 ptlrpc_request_free(req);
2716                 RETURN(rc);
2717         }
2718
2719         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2720         memcpy(tmp, key, keylen);
2721         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2722                                                         &RMF_OST_BODY :
2723                                                         &RMF_SETINFO_VAL);
2724         memcpy(tmp, val, vallen);
2725
2726         if (KEY_IS(KEY_GRANT_SHRINK)) {
2727                 struct osc_grant_args *aa;
2728                 struct obdo *oa;
2729
2730                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2731                 aa = ptlrpc_req_async_args(req);
2732                 OBDO_ALLOC(oa);
2733                 if (!oa) {
2734                         ptlrpc_req_finished(req);
2735                         RETURN(-ENOMEM);
2736                 }
2737                 *oa = ((struct ost_body *)val)->oa;
2738                 aa->aa_oa = oa;
2739                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2740         }
2741
2742         ptlrpc_request_set_replen(req);
2743         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2744                 LASSERT(set != NULL);
2745                 ptlrpc_set_add_req(set, req);
2746                 ptlrpc_check_set(NULL, set);
2747         } else
2748                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2749
2750         RETURN(0);
2751 }
2752
2753 static int osc_reconnect(const struct lu_env *env,
2754                          struct obd_export *exp, struct obd_device *obd,
2755                          struct obd_uuid *cluuid,
2756                          struct obd_connect_data *data,
2757                          void *localdata)
2758 {
2759         struct client_obd *cli = &obd->u.cli;
2760
2761         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2762                 long lost_grant;
2763
2764                 client_obd_list_lock(&cli->cl_loi_list_lock);
2765                 data->ocd_grant = (cli->cl_avail_grant +
2766                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2767                                   2 * cli_brw_size(obd);
2768                 lost_grant = cli->cl_lost_grant;
2769                 cli->cl_lost_grant = 0;
2770                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2771
2772                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2773                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2774                        data->ocd_version, data->ocd_grant, lost_grant);
2775         }
2776
2777         RETURN(0);
2778 }
2779
2780 static int osc_disconnect(struct obd_export *exp)
2781 {
2782         struct obd_device *obd = class_exp2obd(exp);
2783         int rc;
2784
2785         rc = client_disconnect_export(exp);
2786         /**
2787          * Initially we put del_shrink_grant before disconnect_export, but it
2788          * causes the following problem if setup (connect) and cleanup
2789          * (disconnect) are tangled together.
2790          *      connect p1                     disconnect p2
2791          *   ptlrpc_connect_import
2792          *     ...............               class_manual_cleanup
2793          *                                     osc_disconnect
2794          *                                     del_shrink_grant
2795          *   ptlrpc_connect_interrupt
2796          *     init_grant_shrink
2797          *   add this client to shrink list
2798          *                                      cleanup_osc
2799          * Bang! pinger trigger the shrink.
2800          * So the osc should be disconnected from the shrink list, after we
2801          * are sure the import has been destroyed. BUG18662
2802          */
2803         if (obd->u.cli.cl_import == NULL)
2804                 osc_del_shrink_grant(&obd->u.cli);
2805         return rc;
2806 }
2807
2808 static int osc_import_event(struct obd_device *obd,
2809                             struct obd_import *imp,
2810                             enum obd_import_event event)
2811 {
2812         struct client_obd *cli;
2813         int rc = 0;
2814
2815         ENTRY;
2816         LASSERT(imp->imp_obd == obd);
2817
2818         switch (event) {
2819         case IMP_EVENT_DISCON: {
2820                 cli = &obd->u.cli;
2821                 client_obd_list_lock(&cli->cl_loi_list_lock);
2822                 cli->cl_avail_grant = 0;
2823                 cli->cl_lost_grant = 0;
2824                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2825                 break;
2826         }
2827         case IMP_EVENT_INACTIVE: {
2828                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2829                 break;
2830         }
2831         case IMP_EVENT_INVALIDATE: {
2832                 struct ldlm_namespace *ns = obd->obd_namespace;
2833                 struct lu_env         *env;
2834                 int                    refcheck;
2835
2836                 env = cl_env_get(&refcheck);
2837                 if (!IS_ERR(env)) {
2838                         /* Reset grants */
2839                         cli = &obd->u.cli;
2840                         /* all pages go to failing rpcs due to the invalid
2841                          * import */
2842                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2843
2844                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2845                         cl_env_put(env, &refcheck);
2846                 } else
2847                         rc = PTR_ERR(env);
2848                 break;
2849         }
2850         case IMP_EVENT_ACTIVE: {
2851                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2852                 break;
2853         }
2854         case IMP_EVENT_OCD: {
2855                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2856
2857                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2858                         osc_init_grant(&obd->u.cli, ocd);
2859
2860                 /* See bug 7198 */
2861                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2862                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2863
2864                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2865                 break;
2866         }
2867         case IMP_EVENT_DEACTIVATE: {
2868                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2869                 break;
2870         }
2871         case IMP_EVENT_ACTIVATE: {
2872                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2873                 break;
2874         }
2875         default:
2876                 CERROR("Unknown import event %d\n", event);
2877                 LBUG();
2878         }
2879         RETURN(rc);
2880 }
2881
2882 /**
2883  * Determine whether the lock can be canceled before replaying the lock
2884  * during recovery, see bug16774 for detailed information.
2885  *
2886  * \retval zero the lock can't be canceled
2887  * \retval other ok to cancel
2888  */
2889 static int osc_cancel_weight(struct ldlm_lock *lock)
2890 {
2891         /*
2892          * Cancel all unused and granted extent lock.
2893          */
2894         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2895             lock->l_granted_mode == lock->l_req_mode &&
2896             osc_ldlm_weigh_ast(lock) == 0)
2897                 RETURN(1);
2898
2899         RETURN(0);
2900 }
2901
2902 static int brw_queue_work(const struct lu_env *env, void *data)
2903 {
2904         struct client_obd *cli = data;
2905
2906         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2907
2908         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2909         RETURN(0);
2910 }
2911
2912 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2913 {
2914         struct client_obd *cli = &obd->u.cli;
2915         struct obd_type   *type;
2916         void              *handler;
2917         int                rc;
2918         ENTRY;
2919
2920         rc = ptlrpcd_addref();
2921         if (rc)
2922                 RETURN(rc);
2923
2924         rc = client_obd_setup(obd, lcfg);
2925         if (rc)
2926                 GOTO(out_ptlrpcd, rc);
2927
2928         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2929         if (IS_ERR(handler))
2930                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2931         cli->cl_writeback_work = handler;
2932
2933         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2934         if (IS_ERR(handler))
2935                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2936         cli->cl_lru_work = handler;
2937
2938         rc = osc_quota_setup(obd);
2939         if (rc)
2940                 GOTO(out_ptlrpcd_work, rc);
2941
2942         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2943
2944 #ifdef LPROCFS
2945         obd->obd_vars = lprocfs_osc_obd_vars;
2946 #endif
2947         /* If this is true then both client (osc) and server (osp) are on the
2948          * same node. The osp layer if loaded first will register the osc proc
2949          * directory. In that case this obd_device will be attached its proc
2950          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2951         type = class_search_type(LUSTRE_OSP_NAME);
2952         if (type && type->typ_procsym) {
2953                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
2954                                                            type->typ_procsym,
2955                                                            obd->obd_vars, obd);
2956                 if (IS_ERR(obd->obd_proc_entry)) {
2957                         rc = PTR_ERR(obd->obd_proc_entry);
2958                         CERROR("error %d setting up lprocfs for %s\n", rc,
2959                                obd->obd_name);
2960                         obd->obd_proc_entry = NULL;
2961                 }
2962         } else {
2963                 rc = lprocfs_obd_setup(obd);
2964         }
2965
2966         /* If the basic OSC proc tree construction succeeded then
2967          * lets do the rest. */
2968         if (rc == 0) {
2969                 lproc_osc_attach_seqstat(obd);
2970                 sptlrpc_lprocfs_cliobd_attach(obd);
2971                 ptlrpc_lprocfs_register_obd(obd);
2972         }
2973
2974         /* We need to allocate a few requests more, because
2975          * brw_interpret tries to create new requests before freeing
2976          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2977          * reserved, but I'm afraid that might be too much wasted RAM
2978          * in fact, so 2 is just my guess and still should work. */
2979         cli->cl_import->imp_rq_pool =
2980                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2981                                     OST_MAXREQSIZE,
2982                                     ptlrpc_add_rqs_to_pool);
2983
2984         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2985         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2986         RETURN(0);
2987
2988 out_ptlrpcd_work:
2989         if (cli->cl_writeback_work != NULL) {
2990                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2991                 cli->cl_writeback_work = NULL;
2992         }
2993         if (cli->cl_lru_work != NULL) {
2994                 ptlrpcd_destroy_work(cli->cl_lru_work);
2995                 cli->cl_lru_work = NULL;
2996         }
2997 out_client_setup:
2998         client_obd_cleanup(obd);
2999 out_ptlrpcd:
3000         ptlrpcd_decref();
3001         RETURN(rc);
3002 }
3003
3004 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3005 {
3006         int rc = 0;
3007         ENTRY;
3008
3009         switch (stage) {
3010         case OBD_CLEANUP_EARLY: {
3011                 struct obd_import *imp;
3012                 imp = obd->u.cli.cl_import;
3013                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3014                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3015                 ptlrpc_deactivate_import(imp);
3016                 spin_lock(&imp->imp_lock);
3017                 imp->imp_pingable = 0;
3018                 spin_unlock(&imp->imp_lock);
3019                 break;
3020         }
3021         case OBD_CLEANUP_EXPORTS: {
3022                 struct client_obd *cli = &obd->u.cli;
3023                 /* LU-464
3024                  * for echo client, export may be on zombie list, wait for
3025                  * zombie thread to cull it, because cli.cl_import will be
3026                  * cleared in client_disconnect_export():
3027                  *   class_export_destroy() -> obd_cleanup() ->
3028                  *   echo_device_free() -> echo_client_cleanup() ->
3029                  *   obd_disconnect() -> osc_disconnect() ->
3030                  *   client_disconnect_export()
3031                  */
3032                 obd_zombie_barrier();
3033                 if (cli->cl_writeback_work) {
3034                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3035                         cli->cl_writeback_work = NULL;
3036                 }
3037                 if (cli->cl_lru_work) {
3038                         ptlrpcd_destroy_work(cli->cl_lru_work);
3039                         cli->cl_lru_work = NULL;
3040                 }
3041                 obd_cleanup_client_import(obd);
3042                 ptlrpc_lprocfs_unregister_obd(obd);
3043                 lprocfs_obd_cleanup(obd);
3044                 break;
3045                 }
3046         }
3047         RETURN(rc);
3048 }
3049
3050 int osc_cleanup(struct obd_device *obd)
3051 {
3052         struct client_obd *cli = &obd->u.cli;
3053         int rc;
3054
3055         ENTRY;
3056
3057         /* lru cleanup */
3058         if (cli->cl_cache != NULL) {
3059                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3060                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3061                 list_del_init(&cli->cl_lru_osc);
3062                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3063                 cli->cl_lru_left = NULL;
3064                 atomic_dec(&cli->cl_cache->ccc_users);
3065                 cli->cl_cache = NULL;
3066         }
3067
3068         /* free memory of osc quota cache */
3069         osc_quota_cleanup(obd);
3070
3071         rc = client_obd_cleanup(obd);
3072
3073         ptlrpcd_decref();
3074         RETURN(rc);
3075 }
3076
3077 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3078 {
3079         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3080         return rc > 0 ? 0: rc;
3081 }
3082
3083 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3084 {
3085         return osc_process_config_base(obd, buf);
3086 }
3087
3088 struct obd_ops osc_obd_ops = {
3089         .o_owner                = THIS_MODULE,
3090         .o_setup                = osc_setup,
3091         .o_precleanup           = osc_precleanup,
3092         .o_cleanup              = osc_cleanup,
3093         .o_add_conn             = client_import_add_conn,
3094         .o_del_conn             = client_import_del_conn,
3095         .o_connect              = client_connect_import,
3096         .o_reconnect            = osc_reconnect,
3097         .o_disconnect           = osc_disconnect,
3098         .o_statfs               = osc_statfs,
3099         .o_statfs_async         = osc_statfs_async,
3100         .o_create               = osc_create,
3101         .o_destroy              = osc_destroy,
3102         .o_getattr              = osc_getattr,
3103         .o_getattr_async        = osc_getattr_async,
3104         .o_setattr              = osc_setattr,
3105         .o_setattr_async        = osc_setattr_async,
3106         .o_change_cbdata        = osc_change_cbdata,
3107         .o_find_cbdata          = osc_find_cbdata,
3108         .o_iocontrol            = osc_iocontrol,
3109         .o_get_info             = osc_get_info,
3110         .o_set_info_async       = osc_set_info_async,
3111         .o_import_event         = osc_import_event,
3112         .o_process_config       = osc_process_config,
3113         .o_quotactl             = osc_quotactl,
3114         .o_quotacheck           = osc_quotacheck,
3115 };
3116
3117 extern struct lu_kmem_descr osc_caches[];
3118 extern struct lock_class_key osc_ast_guard_class;
3119
3120 int __init osc_init(void)
3121 {
3122         bool enable_proc = true;
3123         struct obd_type *type;
3124         int rc;
3125         ENTRY;
3126
3127         /* print an address of _any_ initialized kernel symbol from this
3128          * module, to allow debugging with gdb that doesn't support data
3129          * symbols from modules.*/
3130         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3131
3132         rc = lu_kmem_init(osc_caches);
3133         if (rc)
3134                 RETURN(rc);
3135
3136         type = class_search_type(LUSTRE_OSP_NAME);
3137         if (type != NULL && type->typ_procsym != NULL)
3138                 enable_proc = false;
3139
3140         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3141                                  LUSTRE_OSC_NAME, &osc_device_type);
3142         if (rc) {
3143                 lu_kmem_fini(osc_caches);
3144                 RETURN(rc);
3145         }
3146
3147         RETURN(rc);
3148 }
3149
3150 static void /*__exit*/ osc_exit(void)
3151 {
3152         class_unregister_type(LUSTRE_OSC_NAME);
3153         lu_kmem_fini(osc_caches);
3154 }
3155
3156 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3157 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3158 MODULE_LICENSE("GPL");
3159
3160 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);