Whamcloud - gitweb
LU-6042 osc: osc_object_ast_clear() LBUG
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 static inline void osc_pack_capa(struct ptlrpc_request *req,
104                                  struct ost_body *body, void *capa)
105 {
106         struct obd_capa *oc = (struct obd_capa *)capa;
107         struct lustre_capa *c;
108
109         if (!capa)
110                 return;
111
112         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
113         LASSERT(c);
114         capa_cpy(c, oc);
115         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
116         DEBUG_CAPA(D_SEC, c, "pack");
117 }
118
119 static inline void osc_pack_req_body(struct ptlrpc_request *req,
120                                      struct obd_info *oinfo)
121 {
122         struct ost_body *body;
123
124         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
125         LASSERT(body);
126
127         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
128                              oinfo->oi_oa);
129         osc_pack_capa(req, body, oinfo->oi_capa);
130 }
131
132 static inline void osc_set_capa_size(struct ptlrpc_request *req,
133                                      const struct req_msg_field *field,
134                                      struct obd_capa *oc)
135 {
136         if (oc == NULL)
137                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
138         else
139                 /* it is already calculated as sizeof struct obd_capa */
140                 ;
141 }
142
143 static int osc_getattr_interpret(const struct lu_env *env,
144                                  struct ptlrpc_request *req,
145                                  struct osc_async_args *aa, int rc)
146 {
147         struct ost_body *body;
148         ENTRY;
149
150         if (rc != 0)
151                 GOTO(out, rc);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154         if (body) {
155                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
157                                      aa->aa_oi->oi_oa, &body->oa);
158
159                 /* This should really be sent by the OST */
160                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
161                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
162         } else {
163                 CDEBUG(D_INFO, "can't unpack ost_body\n");
164                 rc = -EPROTO;
165                 aa->aa_oi->oi_oa->o_valid = 0;
166         }
167 out:
168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
169         RETURN(rc);
170 }
171
172 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
173                              struct ptlrpc_request_set *set)
174 {
175         struct ptlrpc_request *req;
176         struct osc_async_args *aa;
177         int                    rc;
178         ENTRY;
179
180         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
181         if (req == NULL)
182                 RETURN(-ENOMEM);
183
184         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
185         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
186         if (rc) {
187                 ptlrpc_request_free(req);
188                 RETURN(rc);
189         }
190
191         osc_pack_req_body(req, oinfo);
192
193         ptlrpc_request_set_replen(req);
194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
195
196         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
197         aa = ptlrpc_req_async_args(req);
198         aa->aa_oi = oinfo;
199
200         ptlrpc_set_add_req(set, req);
201         RETURN(0);
202 }
203
204 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
205                        struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body       *body;
209         int                    rc;
210         ENTRY;
211
212         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
213         if (req == NULL)
214                 RETURN(-ENOMEM);
215
216         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oinfo);
224
225         ptlrpc_request_set_replen(req);
226
227         rc = ptlrpc_queue_wait(req);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
232         if (body == NULL)
233                 GOTO(out, rc = -EPROTO);
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
237                              &body->oa);
238
239         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
249                        struct obd_info *oinfo, struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body       *body;
253         int                    rc;
254         ENTRY;
255
256         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
257
258         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
259         if (req == NULL)
260                 RETURN(-ENOMEM);
261
262         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
264         if (rc) {
265                 ptlrpc_request_free(req);
266                 RETURN(rc);
267         }
268
269         osc_pack_req_body(req, oinfo);
270
271         ptlrpc_request_set_replen(req);
272
273         rc = ptlrpc_queue_wait(req);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
282                              &body->oa);
283
284         EXIT;
285 out:
286         ptlrpc_req_finished(req);
287         RETURN(rc);
288 }
289
290 static int osc_setattr_interpret(const struct lu_env *env,
291                                  struct ptlrpc_request *req,
292                                  struct osc_setattr_args *sa, int rc)
293 {
294         struct ost_body *body;
295         ENTRY;
296
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
305                              &body->oa);
306 out:
307         rc = sa->sa_upcall(sa->sa_cookie, rc);
308         RETURN(rc);
309 }
310
311 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
312                            struct obd_trans_info *oti,
313                            obd_enqueue_update_f upcall, void *cookie,
314                            struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
333                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         /* do mds to ost setattr asynchronously */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
343         } else {
344                 req->rq_interpret_reply =
345                         (ptlrpc_interpterer_t)osc_setattr_interpret;
346
347                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
348                 sa = ptlrpc_req_async_args(req);
349                 sa->sa_oa = oinfo->oi_oa;
350                 sa->sa_upcall = upcall;
351                 sa->sa_cookie = cookie;
352
353                 if (rqset == PTLRPCD_SET)
354                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
355                 else
356                         ptlrpc_set_add_req(rqset, req);
357         }
358
359         RETURN(0);
360 }
361
362 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
363                              struct obd_trans_info *oti,
364                              struct ptlrpc_request_set *rqset)
365 {
366         return osc_setattr_async_base(exp, oinfo, oti,
367                                       oinfo->oi_cb_up, oinfo, rqset);
368 }
369
370 static int osc_create(const struct lu_env *env, struct obd_export *exp,
371                       struct obdo *oa, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body       *body;
375         int                    rc;
376         ENTRY;
377
378         LASSERT(oa != NULL);
379         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
380         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
383         if (req == NULL)
384                 GOTO(out, rc = -ENOMEM);
385
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 GOTO(out, rc);
390         }
391
392         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
393         LASSERT(body);
394
395         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
396
397         ptlrpc_request_set_replen(req);
398
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
412         if (body == NULL)
413                 GOTO(out_req, rc = -EPROTO);
414
415         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
416         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
417
418         oa->o_blksize = cli_brw_size(exp->exp_obd);
419         oa->o_valid |= OBD_MD_FLBLKSZ;
420
421         if (oti != NULL) {
422                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
423                         if (oti->oti_logcookies == NULL)
424                                 oti->oti_logcookies = &oti->oti_onecookie;
425
426                         *oti->oti_logcookies = oa->o_lcookie;
427                 }
428         }
429
430         CDEBUG(D_HA, "transno: "LPD64"\n",
431                lustre_msg_get_transno(req->rq_repmsg));
432 out_req:
433         ptlrpc_req_finished(req);
434 out:
435         RETURN(rc);
436 }
437
438 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
439                    obd_enqueue_update_f upcall, void *cookie,
440                    struct ptlrpc_request_set *rqset)
441 {
442         struct ptlrpc_request   *req;
443         struct osc_setattr_args *sa;
444         struct ost_body         *body;
445         int                      rc;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
449         if (req == NULL)
450                 RETURN(-ENOMEM);
451
452         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(rc);
457         }
458         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
459         ptlrpc_at_set_req_timeout(req);
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
464                              oinfo->oi_oa);
465         osc_pack_capa(req, body, oinfo->oi_capa);
466
467         ptlrpc_request_set_replen(req);
468
469         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
470         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
471         sa = ptlrpc_req_async_args(req);
472         sa->sa_oa     = oinfo->oi_oa;
473         sa->sa_upcall = upcall;
474         sa->sa_cookie = cookie;
475         if (rqset == PTLRPCD_SET)
476                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
477         else
478                 ptlrpc_set_add_req(rqset, req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req,
485                               void *arg, int rc)
486 {
487         struct osc_fsync_args *fa = arg;
488         struct ost_body *body;
489         ENTRY;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
495         if (body == NULL) {
496                 CERROR ("can't unpack ost_body\n");
497                 GOTO(out, rc = -EPROTO);
498         }
499
500         *fa->fa_oi->oi_oa = body->oa;
501 out:
502         rc = fa->fa_upcall(fa->fa_cookie, rc);
503         RETURN(rc);
504 }
505
506 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
507                   obd_enqueue_update_f upcall, void *cookie,
508                   struct ptlrpc_request_set *rqset)
509 {
510         struct ptlrpc_request *req;
511         struct ost_body       *body;
512         struct osc_fsync_args *fa;
513         int                    rc;
514         ENTRY;
515
516         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
517         if (req == NULL)
518                 RETURN(-ENOMEM);
519
520         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(rc);
525         }
526
527         /* overload the size and blocks fields in the oa with start/end */
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
531                              oinfo->oi_oa);
532         osc_pack_capa(req, body, oinfo->oi_capa);
533
534         ptlrpc_request_set_replen(req);
535         req->rq_interpret_reply = osc_sync_interpret;
536
537         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
538         fa = ptlrpc_req_async_args(req);
539         fa->fa_oi = oinfo;
540         fa->fa_upcall = upcall;
541         fa->fa_cookie = cookie;
542
543         if (rqset == PTLRPCD_SET)
544                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
545         else
546                 ptlrpc_set_add_req(rqset, req);
547
548         RETURN (0);
549 }
550
551 /* Find and cancel locally locks matched by @mode in the resource found by
552  * @objid. Found locks are added into @cancel list. Returns the amount of
553  * locks added to @cancels list. */
554 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
555                                    struct list_head *cancels,
556                                    ldlm_mode_t mode, __u64 lock_flags)
557 {
558         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
559         struct ldlm_res_id res_id;
560         struct ldlm_resource *res;
561         int count;
562         ENTRY;
563
564         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
565          * export) but disabled through procfs (flag in NS).
566          *
567          * This distinguishes from a case when ELC is not supported originally,
568          * when we still want to cancel locks in advance and just cancel them
569          * locally, without sending any RPC. */
570         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
571                 RETURN(0);
572
573         ostid_build_res_name(&oa->o_oi, &res_id);
574         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
575         if (IS_ERR(res))
576                 RETURN(0);
577
578         LDLM_RESOURCE_ADDREF(res);
579         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
580                                            lock_flags, 0, NULL);
581         LDLM_RESOURCE_DELREF(res);
582         ldlm_resource_putref(res);
583         RETURN(count);
584 }
585
586 static int osc_destroy_interpret(const struct lu_env *env,
587                                  struct ptlrpc_request *req, void *data,
588                                  int rc)
589 {
590         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
591
592         atomic_dec(&cli->cl_destroy_in_flight);
593         wake_up(&cli->cl_destroy_waitq);
594         return 0;
595 }
596
597 static int osc_can_send_destroy(struct client_obd *cli)
598 {
599         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
600             cli->cl_max_rpcs_in_flight) {
601                 /* The destroy request can be sent */
602                 return 1;
603         }
604         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
605             cli->cl_max_rpcs_in_flight) {
606                 /*
607                  * The counter has been modified between the two atomic
608                  * operations.
609                  */
610                 wake_up(&cli->cl_destroy_waitq);
611         }
612         return 0;
613 }
614
615 /* Destroy requests can be async always on the client, and we don't even really
616  * care about the return code since the client cannot do anything at all about
617  * a destroy failure.
618  * When the MDS is unlinking a filename, it saves the file objects into a
619  * recovery llog, and these object records are cancelled when the OST reports
620  * they were destroyed and sync'd to disk (i.e. transaction committed).
621  * If the client dies, or the OST is down when the object should be destroyed,
622  * the records are not cancelled, and when the OST reconnects to the MDS next,
623  * it will retrieve the llog unlink logs and then sends the log cancellation
624  * cookies to the MDS after committing destroy transactions. */
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa, struct obd_trans_info *oti)
627 {
628         struct client_obd     *cli = &exp->exp_obd->u.cli;
629         struct ptlrpc_request *req;
630         struct ost_body       *body;
631         struct list_head       cancels = LIST_HEAD_INIT(cancels);
632         int rc, count;
633         ENTRY;
634
635         if (!oa) {
636                 CDEBUG(D_INFO, "oa NULL\n");
637                 RETURN(-EINVAL);
638         }
639
640         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641                                         LDLM_FL_DISCARD_DATA);
642
643         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
644         if (req == NULL) {
645                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
646                 RETURN(-ENOMEM);
647         }
648
649         osc_set_capa_size(req, &RMF_CAPA1, NULL);
650         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
651                                0, &cancels, count);
652         if (rc) {
653                 ptlrpc_request_free(req);
654                 RETURN(rc);
655         }
656
657         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658         ptlrpc_at_set_req_timeout(req);
659
660         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
661                 oa->o_lcookie = *oti->oti_logcookies;
662         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
663         LASSERT(body);
664         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
665
666         ptlrpc_request_set_replen(req);
667
668         /* If osc_destory is for destroying the unlink orphan,
669          * sent from MDT to OST, which should not be blocked here,
670          * because the process might be triggered by ptlrpcd, and
671          * it is not good to block ptlrpcd thread (b=16006)*/
672         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
673                 req->rq_interpret_reply = osc_destroy_interpret;
674                 if (!osc_can_send_destroy(cli)) {
675                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
676                                                           NULL);
677
678                         /*
679                          * Wait until the number of on-going destroy RPCs drops
680                          * under max_rpc_in_flight
681                          */
682                         l_wait_event_exclusive(cli->cl_destroy_waitq,
683                                                osc_can_send_destroy(cli), &lwi);
684                 }
685         }
686
687         /* Do not wait for response */
688         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
689         RETURN(0);
690 }
691
692 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
693                                 long writing_bytes)
694 {
695         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
696
697         LASSERT(!(oa->o_valid & bits));
698
699         oa->o_valid |= bits;
700         spin_lock(&cli->cl_loi_list_lock);
701         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
702         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
703                      cli->cl_dirty_max_pages)) {
704                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
705                        cli->cl_dirty_pages, cli->cl_dirty_transit,
706                        cli->cl_dirty_max_pages);
707                 oa->o_undirty = 0;
708         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
709                             atomic_long_read(&obd_dirty_transit_pages) >
710                             (obd_max_dirty_pages + 1))) {
711                 /* The atomic_read() allowing the atomic_inc() are
712                  * not covered by a lock thus they may safely race and trip
713                  * this CERROR() unless we add in a small fudge factor (+1). */
714                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
715                        cli->cl_import->imp_obd->obd_name,
716                        atomic_long_read(&obd_dirty_pages),
717                        atomic_long_read(&obd_dirty_transit_pages),
718                        obd_max_dirty_pages);
719                 oa->o_undirty = 0;
720         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
721                             0x7fffffff)) {
722                 CERROR("dirty %lu - dirty_max %lu too big???\n",
723                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
724                 oa->o_undirty = 0;
725         } else {
726                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
727                                       PAGE_CACHE_SHIFT) *
728                                      (cli->cl_max_rpcs_in_flight + 1);
729                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
730                                     max_in_flight);
731         }
732         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
733         oa->o_dropped = cli->cl_lost_grant;
734         cli->cl_lost_grant = 0;
735         spin_unlock(&cli->cl_loi_list_lock);
736         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
737                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
738
739 }
740
741 void osc_update_next_shrink(struct client_obd *cli)
742 {
743         cli->cl_next_shrink_grant =
744                 cfs_time_shift(cli->cl_grant_shrink_interval);
745         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
746                cli->cl_next_shrink_grant);
747 }
748
749 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
750 {
751         spin_lock(&cli->cl_loi_list_lock);
752         cli->cl_avail_grant += grant;
753         spin_unlock(&cli->cl_loi_list_lock);
754 }
755
756 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
757 {
758         if (body->oa.o_valid & OBD_MD_FLGRANT) {
759                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
760                 __osc_update_grant(cli, body->oa.o_grant);
761         }
762 }
763
764 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
765                               obd_count keylen, void *key, obd_count vallen,
766                               void *val, struct ptlrpc_request_set *set);
767
768 static int osc_shrink_grant_interpret(const struct lu_env *env,
769                                       struct ptlrpc_request *req,
770                                       void *aa, int rc)
771 {
772         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
773         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
774         struct ost_body *body;
775
776         if (rc != 0) {
777                 __osc_update_grant(cli, oa->o_grant);
778                 GOTO(out, rc);
779         }
780
781         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
782         LASSERT(body);
783         osc_update_grant(cli, body);
784 out:
785         OBDO_FREE(oa);
786         return rc;
787 }
788
789 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
790 {
791         spin_lock(&cli->cl_loi_list_lock);
792         oa->o_grant = cli->cl_avail_grant / 4;
793         cli->cl_avail_grant -= oa->o_grant;
794         spin_unlock(&cli->cl_loi_list_lock);
795         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
796                 oa->o_valid |= OBD_MD_FLFLAGS;
797                 oa->o_flags = 0;
798         }
799         oa->o_flags |= OBD_FL_SHRINK_GRANT;
800         osc_update_next_shrink(cli);
801 }
802
803 /* Shrink the current grant, either from some large amount to enough for a
804  * full set of in-flight RPCs, or if we have already shrunk to that limit
805  * then to enough for a single RPC.  This avoids keeping more grant than
806  * needed, and avoids shrinking the grant piecemeal. */
807 static int osc_shrink_grant(struct client_obd *cli)
808 {
809         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
810                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
811
812         spin_lock(&cli->cl_loi_list_lock);
813         if (cli->cl_avail_grant <= target_bytes)
814                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
815         spin_unlock(&cli->cl_loi_list_lock);
816
817         return osc_shrink_grant_to_target(cli, target_bytes);
818 }
819
820 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
821 {
822         int                     rc = 0;
823         struct ost_body        *body;
824         ENTRY;
825
826         spin_lock(&cli->cl_loi_list_lock);
827         /* Don't shrink if we are already above or below the desired limit
828          * We don't want to shrink below a single RPC, as that will negatively
829          * impact block allocation and long-term performance. */
830         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
831                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
832
833         if (target_bytes >= cli->cl_avail_grant) {
834                 spin_unlock(&cli->cl_loi_list_lock);
835                 RETURN(0);
836         }
837         spin_unlock(&cli->cl_loi_list_lock);
838
839         OBD_ALLOC_PTR(body);
840         if (!body)
841                 RETURN(-ENOMEM);
842
843         osc_announce_cached(cli, &body->oa, 0);
844
845         spin_lock(&cli->cl_loi_list_lock);
846         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
847         cli->cl_avail_grant = target_bytes;
848         spin_unlock(&cli->cl_loi_list_lock);
849         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
850                 body->oa.o_valid |= OBD_MD_FLFLAGS;
851                 body->oa.o_flags = 0;
852         }
853         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
854         osc_update_next_shrink(cli);
855
856         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
857                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
858                                 sizeof(*body), body, NULL);
859         if (rc != 0)
860                 __osc_update_grant(cli, body->oa.o_grant);
861         OBD_FREE_PTR(body);
862         RETURN(rc);
863 }
864
865 static int osc_should_shrink_grant(struct client_obd *client)
866 {
867         cfs_time_t time = cfs_time_current();
868         cfs_time_t next_shrink = client->cl_next_shrink_grant;
869
870         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
871              OBD_CONNECT_GRANT_SHRINK) == 0)
872                 return 0;
873
874         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
875                 /* Get the current RPC size directly, instead of going via:
876                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
877                  * Keep comment here so that it can be found by searching. */
878                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
879
880                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
881                     client->cl_avail_grant > brw_size)
882                         return 1;
883                 else
884                         osc_update_next_shrink(client);
885         }
886         return 0;
887 }
888
889 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
890 {
891         struct client_obd *client;
892
893         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
894                 if (osc_should_shrink_grant(client))
895                         osc_shrink_grant(client);
896         }
897         return 0;
898 }
899
900 static int osc_add_shrink_grant(struct client_obd *client)
901 {
902         int rc;
903
904         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
905                                        TIMEOUT_GRANT,
906                                        osc_grant_shrink_grant_cb, NULL,
907                                        &client->cl_grant_shrink_list);
908         if (rc) {
909                 CERROR("add grant client %s error %d\n",
910                         client->cl_import->imp_obd->obd_name, rc);
911                 return rc;
912         }
913         CDEBUG(D_CACHE, "add grant client %s \n",
914                client->cl_import->imp_obd->obd_name);
915         osc_update_next_shrink(client);
916         return 0;
917 }
918
919 static int osc_del_shrink_grant(struct client_obd *client)
920 {
921         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
922                                          TIMEOUT_GRANT);
923 }
924
925 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
926 {
927         /*
928          * ocd_grant is the total grant amount we're expect to hold: if we've
929          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
930          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
931          * dirty.
932          *
933          * race is tolerable here: if we're evicted, but imp_state already
934          * left EVICTED state, then cl_dirty_pages must be 0 already.
935          */
936         spin_lock(&cli->cl_loi_list_lock);
937         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
938                 cli->cl_avail_grant = ocd->ocd_grant;
939         else
940                 cli->cl_avail_grant = ocd->ocd_grant -
941                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
942
943         if (cli->cl_avail_grant < 0) {
944                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
945                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
946                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
947                 /* workaround for servers which do not have the patch from
948                  * LU-2679 */
949                 cli->cl_avail_grant = ocd->ocd_grant;
950         }
951
952         /* determine the appropriate chunk size used by osc_extent. */
953         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
954         spin_unlock(&cli->cl_loi_list_lock);
955
956         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
957                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
958                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
959
960         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
961             list_empty(&cli->cl_grant_shrink_list))
962                 osc_add_shrink_grant(cli);
963 }
964
965 /* We assume that the reason this OSC got a short read is because it read
966  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
967  * via the LOV, and it _knows_ it's reading inside the file, it's just that
968  * this stripe never got written at or beyond this stripe offset yet. */
969 static void handle_short_read(int nob_read, obd_count page_count,
970                               struct brw_page **pga)
971 {
972         char *ptr;
973         int i = 0;
974
975         /* skip bytes read OK */
976         while (nob_read > 0) {
977                 LASSERT (page_count > 0);
978
979                 if (pga[i]->count > nob_read) {
980                         /* EOF inside this page */
981                         ptr = kmap(pga[i]->pg) +
982                                 (pga[i]->off & ~CFS_PAGE_MASK);
983                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
984                         kunmap(pga[i]->pg);
985                         page_count--;
986                         i++;
987                         break;
988                 }
989
990                 nob_read -= pga[i]->count;
991                 page_count--;
992                 i++;
993         }
994
995         /* zero remaining pages */
996         while (page_count-- > 0) {
997                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
998                 memset(ptr, 0, pga[i]->count);
999                 kunmap(pga[i]->pg);
1000                 i++;
1001         }
1002 }
1003
1004 static int check_write_rcs(struct ptlrpc_request *req,
1005                            int requested_nob, int niocount,
1006                            obd_count page_count, struct brw_page **pga)
1007 {
1008         int     i;
1009         __u32   *remote_rcs;
1010
1011         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1012                                                   sizeof(*remote_rcs) *
1013                                                   niocount);
1014         if (remote_rcs == NULL) {
1015                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1016                 return(-EPROTO);
1017         }
1018
1019         /* return error if any niobuf was in error */
1020         for (i = 0; i < niocount; i++) {
1021                 if ((int)remote_rcs[i] < 0)
1022                         return(remote_rcs[i]);
1023
1024                 if (remote_rcs[i] != 0) {
1025                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1026                                 i, remote_rcs[i], req);
1027                         return(-EPROTO);
1028                 }
1029         }
1030
1031         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1032                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1033                        req->rq_bulk->bd_nob_transferred, requested_nob);
1034                 return(-EPROTO);
1035         }
1036
1037         return (0);
1038 }
1039
1040 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1041 {
1042         if (p1->flag != p2->flag) {
1043                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1044                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1045                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1046
1047                 /* warn if we try to combine flags that we don't know to be
1048                  * safe to combine */
1049                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1050                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1051                               "report this at https://jira.hpdd.intel.com/\n",
1052                               p1->flag, p2->flag);
1053                 }
1054                 return 0;
1055         }
1056
1057         return (p1->off + p1->count == p2->off);
1058 }
1059
1060 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1061                                    struct brw_page **pga, int opc,
1062                                    cksum_type_t cksum_type)
1063 {
1064         __u32                           cksum;
1065         int                             i = 0;
1066         struct cfs_crypto_hash_desc     *hdesc;
1067         unsigned int                    bufsize;
1068         int                             err;
1069         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1070
1071         LASSERT(pg_count > 0);
1072
1073         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1074         if (IS_ERR(hdesc)) {
1075                 CERROR("Unable to initialize checksum hash %s\n",
1076                        cfs_crypto_hash_name(cfs_alg));
1077                 return PTR_ERR(hdesc);
1078         }
1079
1080         while (nob > 0 && pg_count > 0) {
1081                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1082
1083                 /* corrupt the data before we compute the checksum, to
1084                  * simulate an OST->client data error */
1085                 if (i == 0 && opc == OST_READ &&
1086                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1087                         unsigned char *ptr = kmap(pga[i]->pg);
1088                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1089
1090                         memcpy(ptr + off, "bad1", min(4, nob));
1091                         kunmap(pga[i]->pg);
1092                 }
1093                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1094                                             pga[i]->off & ~CFS_PAGE_MASK,
1095                                             count);
1096                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1097                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1098
1099                 nob -= pga[i]->count;
1100                 pg_count--;
1101                 i++;
1102         }
1103
1104         bufsize = sizeof(cksum);
1105         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1106
1107         /* For sending we only compute the wrong checksum instead
1108          * of corrupting the data so it is still correct on a redo */
1109         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1110                 cksum++;
1111
1112         return cksum;
1113 }
1114
1115 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1116                                 struct lov_stripe_md *lsm, obd_count page_count,
1117                                 struct brw_page **pga,
1118                                 struct ptlrpc_request **reqp,
1119                                 struct obd_capa *ocapa, int reserve,
1120                                 int resend)
1121 {
1122         struct ptlrpc_request   *req;
1123         struct ptlrpc_bulk_desc *desc;
1124         struct ost_body         *body;
1125         struct obd_ioobj        *ioobj;
1126         struct niobuf_remote    *niobuf;
1127         int niocount, i, requested_nob, opc, rc;
1128         struct osc_brw_async_args *aa;
1129         struct req_capsule      *pill;
1130         struct brw_page *pg_prev;
1131
1132         ENTRY;
1133         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1134                 RETURN(-ENOMEM); /* Recoverable */
1135         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1136                 RETURN(-EINVAL); /* Fatal */
1137
1138         if ((cmd & OBD_BRW_WRITE) != 0) {
1139                 opc = OST_WRITE;
1140                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1141                                                 cli->cl_import->imp_rq_pool,
1142                                                 &RQF_OST_BRW_WRITE);
1143         } else {
1144                 opc = OST_READ;
1145                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1146         }
1147         if (req == NULL)
1148                 RETURN(-ENOMEM);
1149
1150         for (niocount = i = 1; i < page_count; i++) {
1151                 if (!can_merge_pages(pga[i - 1], pga[i]))
1152                         niocount++;
1153         }
1154
1155         pill = &req->rq_pill;
1156         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1157                              sizeof(*ioobj));
1158         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1159                              niocount * sizeof(*niobuf));
1160         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1161
1162         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1163         if (rc) {
1164                 ptlrpc_request_free(req);
1165                 RETURN(rc);
1166         }
1167         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1168         ptlrpc_at_set_req_timeout(req);
1169         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1170          * retry logic */
1171         req->rq_no_retry_einprogress = 1;
1172
1173         desc = ptlrpc_prep_bulk_imp(req, page_count,
1174                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1175                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1176                 OST_BULK_PORTAL);
1177
1178         if (desc == NULL)
1179                 GOTO(out, rc = -ENOMEM);
1180         /* NB request now owns desc and will free it when it gets freed */
1181
1182         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1183         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1184         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1185         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1186
1187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1188
1189         obdo_to_ioobj(oa, ioobj);
1190         ioobj->ioo_bufcnt = niocount;
1191         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1192          * that might be send for this request.  The actual number is decided
1193          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1194          * "max - 1" for old client compatibility sending "0", and also so the
1195          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1196         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1197         osc_pack_capa(req, body, ocapa);
1198         LASSERT(page_count > 0);
1199         pg_prev = pga[0];
1200         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1201                 struct brw_page *pg = pga[i];
1202                 int poff = pg->off & ~CFS_PAGE_MASK;
1203
1204                 LASSERT(pg->count > 0);
1205                 /* make sure there is no gap in the middle of page array */
1206                 LASSERTF(page_count == 1 ||
1207                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1208                           ergo(i > 0 && i < page_count - 1,
1209                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1210                           ergo(i == page_count - 1, poff == 0)),
1211                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1212                          i, page_count, pg, pg->off, pg->count);
1213                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1214                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1215                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1216                          i, page_count,
1217                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1218                          pg_prev->pg, page_private(pg_prev->pg),
1219                          pg_prev->pg->index, pg_prev->off);
1220                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1221                         (pg->flag & OBD_BRW_SRVLOCK));
1222
1223                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1224                 requested_nob += pg->count;
1225
1226                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1227                         niobuf--;
1228                         niobuf->rnb_len += pg->count;
1229                 } else {
1230                         niobuf->rnb_offset = pg->off;
1231                         niobuf->rnb_len    = pg->count;
1232                         niobuf->rnb_flags  = pg->flag;
1233                 }
1234                 pg_prev = pg;
1235         }
1236
1237         LASSERTF((void *)(niobuf - niocount) ==
1238                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1239                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1240                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1241
1242         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1243         if (resend) {
1244                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1245                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1246                         body->oa.o_flags = 0;
1247                 }
1248                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1249         }
1250
1251         if (osc_should_shrink_grant(cli))
1252                 osc_shrink_grant_local(cli, &body->oa);
1253
1254         /* size[REQ_REC_OFF] still sizeof (*body) */
1255         if (opc == OST_WRITE) {
1256                 if (cli->cl_checksum &&
1257                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1258                         /* store cl_cksum_type in a local variable since
1259                          * it can be changed via lprocfs */
1260                         cksum_type_t cksum_type = cli->cl_cksum_type;
1261
1262                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1263                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1264                                 body->oa.o_flags = 0;
1265                         }
1266                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1267                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1268                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1269                                                              page_count, pga,
1270                                                              OST_WRITE,
1271                                                              cksum_type);
1272                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1273                                body->oa.o_cksum);
1274                         /* save this in 'oa', too, for later checking */
1275                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1276                         oa->o_flags |= cksum_type_pack(cksum_type);
1277                 } else {
1278                         /* clear out the checksum flag, in case this is a
1279                          * resend but cl_checksum is no longer set. b=11238 */
1280                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1281                 }
1282                 oa->o_cksum = body->oa.o_cksum;
1283                 /* 1 RC per niobuf */
1284                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1285                                      sizeof(__u32) * niocount);
1286         } else {
1287                 if (cli->cl_checksum &&
1288                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1289                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1290                                 body->oa.o_flags = 0;
1291                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1292                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1293                 }
1294         }
1295         ptlrpc_request_set_replen(req);
1296
1297         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1298         aa = ptlrpc_req_async_args(req);
1299         aa->aa_oa = oa;
1300         aa->aa_requested_nob = requested_nob;
1301         aa->aa_nio_count = niocount;
1302         aa->aa_page_count = page_count;
1303         aa->aa_resends = 0;
1304         aa->aa_ppga = pga;
1305         aa->aa_cli = cli;
1306         INIT_LIST_HEAD(&aa->aa_oaps);
1307         if (ocapa && reserve)
1308                 aa->aa_ocapa = capa_get(ocapa);
1309
1310         *reqp = req;
1311         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1313                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1314                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1315         RETURN(0);
1316
1317  out:
1318         ptlrpc_req_finished(req);
1319         RETURN(rc);
1320 }
1321
1322 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1323                                 __u32 client_cksum, __u32 server_cksum, int nob,
1324                                 obd_count page_count, struct brw_page **pga,
1325                                 cksum_type_t client_cksum_type)
1326 {
1327         __u32 new_cksum;
1328         char *msg;
1329         cksum_type_t cksum_type;
1330
1331         if (server_cksum == client_cksum) {
1332                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1333                 return 0;
1334         }
1335
1336         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1337                                        oa->o_flags : 0);
1338         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1339                                       cksum_type);
1340
1341         if (cksum_type != client_cksum_type)
1342                 msg = "the server did not use the checksum type specified in "
1343                       "the original request - likely a protocol problem";
1344         else if (new_cksum == server_cksum)
1345                 msg = "changed on the client after we checksummed it - "
1346                       "likely false positive due to mmap IO (bug 11742)";
1347         else if (new_cksum == client_cksum)
1348                 msg = "changed in transit before arrival at OST";
1349         else
1350                 msg = "changed in transit AND doesn't match the original - "
1351                       "likely false positive due to mmap IO (bug 11742)";
1352
1353         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1354                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1355                            msg, libcfs_nid2str(peer->nid),
1356                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1357                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1358                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1359                            POSTID(&oa->o_oi), pga[0]->off,
1360                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1361         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1362                "client csum now %x\n", client_cksum, client_cksum_type,
1363                server_cksum, cksum_type, new_cksum);
1364         return 1;
1365 }
1366
1367 /* Note rc enters this function as number of bytes transferred */
1368 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1369 {
1370         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1371         const lnet_process_id_t *peer =
1372                         &req->rq_import->imp_connection->c_peer;
1373         struct client_obd *cli = aa->aa_cli;
1374         struct ost_body *body;
1375         __u32 client_cksum = 0;
1376         ENTRY;
1377
1378         if (rc < 0 && rc != -EDQUOT) {
1379                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1380                 RETURN(rc);
1381         }
1382
1383         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1384         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1385         if (body == NULL) {
1386                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1387                 RETURN(-EPROTO);
1388         }
1389
1390         /* set/clear over quota flag for a uid/gid */
1391         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1392             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1393                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1394
1395                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1396                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1397                        body->oa.o_flags);
1398                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1399         }
1400
1401         osc_update_grant(cli, body);
1402
1403         if (rc < 0)
1404                 RETURN(rc);
1405
1406         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1407                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1408
1409         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1410                 if (rc > 0) {
1411                         CERROR("Unexpected +ve rc %d\n", rc);
1412                         RETURN(-EPROTO);
1413                 }
1414                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1415
1416                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1417                         RETURN(-EAGAIN);
1418
1419                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1420                     check_write_checksum(&body->oa, peer, client_cksum,
1421                                          body->oa.o_cksum, aa->aa_requested_nob,
1422                                          aa->aa_page_count, aa->aa_ppga,
1423                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1424                         RETURN(-EAGAIN);
1425
1426                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1427                                      aa->aa_page_count, aa->aa_ppga);
1428                 GOTO(out, rc);
1429         }
1430
1431         /* The rest of this function executes only for OST_READs */
1432
1433         /* if unwrap_bulk failed, return -EAGAIN to retry */
1434         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1435         if (rc < 0)
1436                 GOTO(out, rc = -EAGAIN);
1437
1438         if (rc > aa->aa_requested_nob) {
1439                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1440                        aa->aa_requested_nob);
1441                 RETURN(-EPROTO);
1442         }
1443
1444         if (rc != req->rq_bulk->bd_nob_transferred) {
1445                 CERROR ("Unexpected rc %d (%d transferred)\n",
1446                         rc, req->rq_bulk->bd_nob_transferred);
1447                 return (-EPROTO);
1448         }
1449
1450         if (rc < aa->aa_requested_nob)
1451                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1452
1453         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1454                 static int cksum_counter;
1455                 __u32      server_cksum = body->oa.o_cksum;
1456                 char      *via;
1457                 char      *router;
1458                 cksum_type_t cksum_type;
1459
1460                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1461                                                body->oa.o_flags : 0);
1462                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1463                                                  aa->aa_ppga, OST_READ,
1464                                                  cksum_type);
1465
1466                 if (peer->nid == req->rq_bulk->bd_sender) {
1467                         via = router = "";
1468                 } else {
1469                         via = " via ";
1470                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1471                 }
1472
1473                 if (server_cksum != client_cksum) {
1474                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475                                            "%s%s%s inode "DFID" object "DOSTID
1476                                            " extent ["LPU64"-"LPU64"]\n",
1477                                            req->rq_import->imp_obd->obd_name,
1478                                            libcfs_nid2str(peer->nid),
1479                                            via, router,
1480                                            body->oa.o_valid & OBD_MD_FLFID ?
1481                                                 body->oa.o_parent_seq : (__u64)0,
1482                                            body->oa.o_valid & OBD_MD_FLFID ?
1483                                                 body->oa.o_parent_oid : 0,
1484                                            body->oa.o_valid & OBD_MD_FLFID ?
1485                                                 body->oa.o_parent_ver : 0,
1486                                            POSTID(&body->oa.o_oi),
1487                                            aa->aa_ppga[0]->off,
1488                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1489                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1490                                                                         1);
1491                         CERROR("client %x, server %x, cksum_type %x\n",
1492                                client_cksum, server_cksum, cksum_type);
1493                         cksum_counter = 0;
1494                         aa->aa_oa->o_cksum = client_cksum;
1495                         rc = -EAGAIN;
1496                 } else {
1497                         cksum_counter++;
1498                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1499                         rc = 0;
1500                 }
1501         } else if (unlikely(client_cksum)) {
1502                 static int cksum_missed;
1503
1504                 cksum_missed++;
1505                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506                         CERROR("Checksum %u requested from %s but not sent\n",
1507                                cksum_missed, libcfs_nid2str(peer->nid));
1508         } else {
1509                 rc = 0;
1510         }
1511 out:
1512         if (rc >= 0)
1513                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514                                      aa->aa_oa, &body->oa);
1515
1516         RETURN(rc);
1517 }
1518
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520                                 struct osc_brw_async_args *aa, int rc)
1521 {
1522         struct ptlrpc_request *new_req;
1523         struct osc_brw_async_args *new_aa;
1524         struct osc_async_page *oap;
1525         ENTRY;
1526
1527         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528                   "redo for recoverable error %d", rc);
1529
1530         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1532                                   aa->aa_cli, aa->aa_oa,
1533                                   NULL /* lsm unused by osc currently */,
1534                                   aa->aa_page_count, aa->aa_ppga,
1535                                   &new_req, aa->aa_ocapa, 0, 1);
1536         if (rc)
1537                 RETURN(rc);
1538
1539         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1540                 if (oap->oap_request != NULL) {
1541                         LASSERTF(request == oap->oap_request,
1542                                  "request %p != oap_request %p\n",
1543                                  request, oap->oap_request);
1544                         if (oap->oap_interrupted) {
1545                                 ptlrpc_req_finished(new_req);
1546                                 RETURN(-EINTR);
1547                         }
1548                 }
1549         }
1550         /* New request takes over pga and oaps from old request.
1551          * Note that copying a list_head doesn't work, need to move it... */
1552         aa->aa_resends++;
1553         new_req->rq_interpret_reply = request->rq_interpret_reply;
1554         new_req->rq_async_args = request->rq_async_args;
1555         new_req->rq_commit_cb = request->rq_commit_cb;
1556         /* cap resend delay to the current request timeout, this is similar to
1557          * what ptlrpc does (see after_reply()) */
1558         if (aa->aa_resends > new_req->rq_timeout)
1559                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1560         else
1561                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1562         new_req->rq_generation_set = 1;
1563         new_req->rq_import_generation = request->rq_import_generation;
1564
1565         new_aa = ptlrpc_req_async_args(new_req);
1566
1567         INIT_LIST_HEAD(&new_aa->aa_oaps);
1568         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1569         INIT_LIST_HEAD(&new_aa->aa_exts);
1570         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1571         new_aa->aa_resends = aa->aa_resends;
1572
1573         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1574                 if (oap->oap_request) {
1575                         ptlrpc_req_finished(oap->oap_request);
1576                         oap->oap_request = ptlrpc_request_addref(new_req);
1577                 }
1578         }
1579
1580         new_aa->aa_ocapa = aa->aa_ocapa;
1581         aa->aa_ocapa = NULL;
1582
1583         /* XXX: This code will run into problem if we're going to support
1584          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1585          * and wait for all of them to be finished. We should inherit request
1586          * set from old request. */
1587         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1588
1589         DEBUG_REQ(D_INFO, new_req, "new request");
1590         RETURN(0);
1591 }
1592
1593 /*
1594  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1595  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1596  * fine for our small page arrays and doesn't require allocation.  its an
1597  * insertion sort that swaps elements that are strides apart, shrinking the
1598  * stride down until its '1' and the array is sorted.
1599  */
1600 static void sort_brw_pages(struct brw_page **array, int num)
1601 {
1602         int stride, i, j;
1603         struct brw_page *tmp;
1604
1605         if (num == 1)
1606                 return;
1607         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1608                 ;
1609
1610         do {
1611                 stride /= 3;
1612                 for (i = stride ; i < num ; i++) {
1613                         tmp = array[i];
1614                         j = i;
1615                         while (j >= stride && array[j - stride]->off > tmp->off) {
1616                                 array[j] = array[j - stride];
1617                                 j -= stride;
1618                         }
1619                         array[j] = tmp;
1620                 }
1621         } while (stride > 1);
1622 }
1623
1624 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1625 {
1626         LASSERT(ppga != NULL);
1627         OBD_FREE(ppga, sizeof(*ppga) * count);
1628 }
1629
1630 static int brw_interpret(const struct lu_env *env,
1631                          struct ptlrpc_request *req, void *data, int rc)
1632 {
1633         struct osc_brw_async_args *aa = data;
1634         struct osc_extent *ext;
1635         struct osc_extent *tmp;
1636         struct client_obd *cli = aa->aa_cli;
1637         ENTRY;
1638
1639         rc = osc_brw_fini_request(req, rc);
1640         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1641         /* When server return -EINPROGRESS, client should always retry
1642          * regardless of the number of times the bulk was resent already. */
1643         if (osc_recoverable_error(rc)) {
1644                 if (req->rq_import_generation !=
1645                     req->rq_import->imp_generation) {
1646                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1647                                ""DOSTID", rc = %d.\n",
1648                                req->rq_import->imp_obd->obd_name,
1649                                POSTID(&aa->aa_oa->o_oi), rc);
1650                 } else if (rc == -EINPROGRESS ||
1651                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1652                         rc = osc_brw_redo_request(req, aa, rc);
1653                 } else {
1654                         CERROR("%s: too many resent retries for object: "
1655                                ""LPU64":"LPU64", rc = %d.\n",
1656                                req->rq_import->imp_obd->obd_name,
1657                                POSTID(&aa->aa_oa->o_oi), rc);
1658                 }
1659
1660                 if (rc == 0)
1661                         RETURN(0);
1662                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1663                         rc = -EIO;
1664         }
1665
1666         if (aa->aa_ocapa) {
1667                 capa_put(aa->aa_ocapa);
1668                 aa->aa_ocapa = NULL;
1669         }
1670
1671         if (rc == 0) {
1672                 struct obdo *oa = aa->aa_oa;
1673                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1674                 unsigned long valid = 0;
1675                 struct cl_object *obj;
1676                 struct osc_async_page *last;
1677
1678                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1679                 obj = osc2cl(last->oap_obj);
1680
1681                 cl_object_attr_lock(obj);
1682                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1683                         attr->cat_blocks = oa->o_blocks;
1684                         valid |= CAT_BLOCKS;
1685                 }
1686                 if (oa->o_valid & OBD_MD_FLMTIME) {
1687                         attr->cat_mtime = oa->o_mtime;
1688                         valid |= CAT_MTIME;
1689                 }
1690                 if (oa->o_valid & OBD_MD_FLATIME) {
1691                         attr->cat_atime = oa->o_atime;
1692                         valid |= CAT_ATIME;
1693                 }
1694                 if (oa->o_valid & OBD_MD_FLCTIME) {
1695                         attr->cat_ctime = oa->o_ctime;
1696                         valid |= CAT_CTIME;
1697                 }
1698
1699                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1700                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1701                         loff_t last_off = last->oap_count + last->oap_obj_off +
1702                                 last->oap_page_off;
1703
1704                         /* Change file size if this is an out of quota or
1705                          * direct IO write and it extends the file size */
1706                         if (loi->loi_lvb.lvb_size < last_off) {
1707                                 attr->cat_size = last_off;
1708                                 valid |= CAT_SIZE;
1709                         }
1710                         /* Extend KMS if it's not a lockless write */
1711                         if (loi->loi_kms < last_off &&
1712                             oap2osc_page(last)->ops_srvlock == 0) {
1713                                 attr->cat_kms = last_off;
1714                                 valid |= CAT_KMS;
1715                         }
1716                 }
1717
1718                 if (valid != 0)
1719                         cl_object_attr_set(env, obj, attr, valid);
1720                 cl_object_attr_unlock(obj);
1721         }
1722         OBDO_FREE(aa->aa_oa);
1723
1724         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1725                 osc_inc_unstable_pages(req);
1726
1727         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1728                 list_del_init(&ext->oe_link);
1729                 osc_extent_finish(env, ext, 1, rc);
1730         }
1731         LASSERT(list_empty(&aa->aa_exts));
1732         LASSERT(list_empty(&aa->aa_oaps));
1733
1734         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1735                           req->rq_bulk->bd_nob_transferred);
1736         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1737         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1738
1739         spin_lock(&cli->cl_loi_list_lock);
1740         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1741          * is called so we know whether to go to sync BRWs or wait for more
1742          * RPCs to complete */
1743         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1744                 cli->cl_w_in_flight--;
1745         else
1746                 cli->cl_r_in_flight--;
1747         osc_wake_cache_waiters(cli);
1748         spin_unlock(&cli->cl_loi_list_lock);
1749
1750         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1751         RETURN(rc);
1752 }
1753
1754 static void brw_commit(struct ptlrpc_request *req)
1755 {
1756         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1757          * this called via the rq_commit_cb, I need to ensure
1758          * osc_dec_unstable_pages is still called. Otherwise unstable
1759          * pages may be leaked. */
1760         spin_lock(&req->rq_lock);
1761         if (likely(req->rq_unstable)) {
1762                 req->rq_unstable = 0;
1763                 spin_unlock(&req->rq_lock);
1764
1765                 osc_dec_unstable_pages(req);
1766         } else {
1767                 req->rq_committed = 1;
1768                 spin_unlock(&req->rq_lock);
1769         }
1770 }
1771
1772 /**
1773  * Build an RPC by the list of extent @ext_list. The caller must ensure
1774  * that the total pages in this list are NOT over max pages per RPC.
1775  * Extents in the list must be in OES_RPC state.
1776  */
1777 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1778                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1779 {
1780         struct ptlrpc_request           *req = NULL;
1781         struct osc_extent               *ext;
1782         struct brw_page                 **pga = NULL;
1783         struct osc_brw_async_args       *aa = NULL;
1784         struct obdo                     *oa = NULL;
1785         struct osc_async_page           *oap;
1786         struct osc_async_page           *tmp;
1787         struct cl_req                   *clerq = NULL;
1788         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1789                                                                       CRT_READ;
1790         struct cl_req_attr              *crattr = NULL;
1791         obd_off                         starting_offset = OBD_OBJECT_EOF;
1792         obd_off                         ending_offset = 0;
1793         int                             mpflag = 0;
1794         int                             mem_tight = 0;
1795         int                             page_count = 0;
1796         bool                            soft_sync = false;
1797         int                             i;
1798         int                             rc;
1799         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1800         struct ost_body                 *body;
1801         ENTRY;
1802         LASSERT(!list_empty(ext_list));
1803
1804         /* add pages into rpc_list to build BRW rpc */
1805         list_for_each_entry(ext, ext_list, oe_link) {
1806                 LASSERT(ext->oe_state == OES_RPC);
1807                 mem_tight |= ext->oe_memalloc;
1808                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1809                         ++page_count;
1810                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1811                         if (starting_offset > oap->oap_obj_off)
1812                                 starting_offset = oap->oap_obj_off;
1813                         else
1814                                 LASSERT(oap->oap_page_off == 0);
1815                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1816                                 ending_offset = oap->oap_obj_off +
1817                                                 oap->oap_count;
1818                         else
1819                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1820                                         PAGE_CACHE_SIZE);
1821                 }
1822         }
1823
1824         soft_sync = osc_over_unstable_soft_limit(cli);
1825         if (mem_tight)
1826                 mpflag = cfs_memory_pressure_get_and_set();
1827
1828         OBD_ALLOC(crattr, sizeof(*crattr));
1829         if (crattr == NULL)
1830                 GOTO(out, rc = -ENOMEM);
1831
1832         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1833         if (pga == NULL)
1834                 GOTO(out, rc = -ENOMEM);
1835
1836         OBDO_ALLOC(oa);
1837         if (oa == NULL)
1838                 GOTO(out, rc = -ENOMEM);
1839
1840         i = 0;
1841         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1842                 struct cl_page *page = oap2cl_page(oap);
1843                 if (clerq == NULL) {
1844                         clerq = cl_req_alloc(env, page, crt,
1845                                              1 /* only 1-object rpcs for now */);
1846                         if (IS_ERR(clerq))
1847                                 GOTO(out, rc = PTR_ERR(clerq));
1848                 }
1849                 if (mem_tight)
1850                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1851                 if (soft_sync)
1852                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1853                 pga[i] = &oap->oap_brw_page;
1854                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1855                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1856                        pga[i]->pg, page_index(oap->oap_page), oap,
1857                        pga[i]->flag);
1858                 i++;
1859                 cl_req_page_add(env, clerq, page);
1860         }
1861
1862         /* always get the data for the obdo for the rpc */
1863         LASSERT(clerq != NULL);
1864         crattr->cra_oa = oa;
1865         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1866
1867         rc = cl_req_prep(env, clerq);
1868         if (rc != 0) {
1869                 CERROR("cl_req_prep failed: %d\n", rc);
1870                 GOTO(out, rc);
1871         }
1872
1873         sort_brw_pages(pga, page_count);
1874         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1875                         pga, &req, crattr->cra_capa, 1, 0);
1876         if (rc != 0) {
1877                 CERROR("prep_req failed: %d\n", rc);
1878                 GOTO(out, rc);
1879         }
1880
1881         req->rq_commit_cb = brw_commit;
1882         req->rq_interpret_reply = brw_interpret;
1883
1884         if (mem_tight != 0)
1885                 req->rq_memalloc = 1;
1886
1887         /* Need to update the timestamps after the request is built in case
1888          * we race with setattr (locally or in queue at OST).  If OST gets
1889          * later setattr before earlier BRW (as determined by the request xid),
1890          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1891          * way to do this in a single call.  bug 10150 */
1892         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1893         crattr->cra_oa = &body->oa;
1894         cl_req_attr_set(env, clerq, crattr,
1895                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1896
1897         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1898
1899         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1900         aa = ptlrpc_req_async_args(req);
1901         INIT_LIST_HEAD(&aa->aa_oaps);
1902         list_splice_init(&rpc_list, &aa->aa_oaps);
1903         INIT_LIST_HEAD(&aa->aa_exts);
1904         list_splice_init(ext_list, &aa->aa_exts);
1905         aa->aa_clerq = clerq;
1906
1907         /* queued sync pages can be torn down while the pages
1908          * were between the pending list and the rpc */
1909         tmp = NULL;
1910         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1911                 /* only one oap gets a request reference */
1912                 if (tmp == NULL)
1913                         tmp = oap;
1914                 if (oap->oap_interrupted && !req->rq_intr) {
1915                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1916                                         oap, req);
1917                         ptlrpc_mark_interrupted(req);
1918                 }
1919         }
1920         if (tmp != NULL)
1921                 tmp->oap_request = ptlrpc_request_addref(req);
1922
1923         spin_lock(&cli->cl_loi_list_lock);
1924         starting_offset >>= PAGE_CACHE_SHIFT;
1925         if (cmd == OBD_BRW_READ) {
1926                 cli->cl_r_in_flight++;
1927                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1928                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1929                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1930                                       starting_offset + 1);
1931         } else {
1932                 cli->cl_w_in_flight++;
1933                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1934                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1935                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1936                                       starting_offset + 1);
1937         }
1938         spin_unlock(&cli->cl_loi_list_lock);
1939
1940         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1941                   page_count, aa, cli->cl_r_in_flight,
1942                   cli->cl_w_in_flight);
1943
1944         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1945          * see which CPU/NUMA node the majority of pages were allocated
1946          * on, and try to assign the async RPC to the CPU core
1947          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1948          *
1949          * But on the other hand, we expect that multiple ptlrpcd
1950          * threads and the initial write sponsor can run in parallel,
1951          * especially when data checksum is enabled, which is CPU-bound
1952          * operation and single ptlrpcd thread cannot process in time.
1953          * So more ptlrpcd threads sharing BRW load
1954          * (with PDL_POLICY_ROUND) seems better.
1955          */
1956         ptlrpcd_add_req(req, pol, -1);
1957         rc = 0;
1958         EXIT;
1959
1960 out:
1961         if (mem_tight != 0)
1962                 cfs_memory_pressure_restore(mpflag);
1963
1964         if (crattr != NULL) {
1965                 capa_put(crattr->cra_capa);
1966                 OBD_FREE(crattr, sizeof(*crattr));
1967         }
1968
1969         if (rc != 0) {
1970                 LASSERT(req == NULL);
1971
1972                 if (oa)
1973                         OBDO_FREE(oa);
1974                 if (pga)
1975                         OBD_FREE(pga, sizeof(*pga) * page_count);
1976                 /* this should happen rarely and is pretty bad, it makes the
1977                  * pending list not follow the dirty order */
1978                 while (!list_empty(ext_list)) {
1979                         ext = list_entry(ext_list->next, struct osc_extent,
1980                                          oe_link);
1981                         list_del_init(&ext->oe_link);
1982                         osc_extent_finish(env, ext, 0, rc);
1983                 }
1984                 if (clerq && !IS_ERR(clerq))
1985                         cl_req_completion(env, clerq, rc);
1986         }
1987         RETURN(rc);
1988 }
1989
1990 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1991                                         struct ldlm_enqueue_info *einfo)
1992 {
1993         void *data = einfo->ei_cbdata;
1994         int set = 0;
1995
1996         LASSERT(lock != NULL);
1997         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1998         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1999         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2000         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2001
2002         lock_res_and_lock(lock);
2003
2004         if (lock->l_ast_data == NULL)
2005                 lock->l_ast_data = data;
2006         if (lock->l_ast_data == data)
2007                 set = 1;
2008
2009         unlock_res_and_lock(lock);
2010
2011         return set;
2012 }
2013
2014 static int osc_set_data_with_check(struct lustre_handle *lockh,
2015                                    struct ldlm_enqueue_info *einfo)
2016 {
2017         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2018         int set = 0;
2019
2020         if (lock != NULL) {
2021                 set = osc_set_lock_data_with_check(lock, einfo);
2022                 LDLM_LOCK_PUT(lock);
2023         } else
2024                 CERROR("lockh %p, data %p - client evicted?\n",
2025                        lockh, einfo->ei_cbdata);
2026         return set;
2027 }
2028
2029 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2030                              ldlm_iterator_t replace, void *data)
2031 {
2032         struct ldlm_res_id res_id;
2033         struct obd_device *obd = class_exp2obd(exp);
2034
2035         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2036         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2037         return 0;
2038 }
2039
2040 /* find any ldlm lock of the inode in osc
2041  * return 0    not find
2042  *        1    find one
2043  *      < 0    error */
2044 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2045                            ldlm_iterator_t replace, void *data)
2046 {
2047         struct ldlm_res_id res_id;
2048         struct obd_device *obd = class_exp2obd(exp);
2049         int rc = 0;
2050
2051         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2052         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2053         if (rc == LDLM_ITER_STOP)
2054                 return(1);
2055         if (rc == LDLM_ITER_CONTINUE)
2056                 return(0);
2057         return(rc);
2058 }
2059
2060 static int osc_enqueue_fini(struct ptlrpc_request *req,
2061                             osc_enqueue_upcall_f upcall, void *cookie,
2062                             struct lustre_handle *lockh, ldlm_mode_t mode,
2063                             __u64 *flags, int agl, int errcode)
2064 {
2065         bool intent = *flags & LDLM_FL_HAS_INTENT;
2066         int rc;
2067         ENTRY;
2068
2069         /* The request was created before ldlm_cli_enqueue call. */
2070         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2071                 struct ldlm_reply *rep;
2072
2073                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2074                 LASSERT(rep != NULL);
2075
2076                 rep->lock_policy_res1 =
2077                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2078                 if (rep->lock_policy_res1)
2079                         errcode = rep->lock_policy_res1;
2080                 if (!agl)
2081                         *flags |= LDLM_FL_LVB_READY;
2082         } else if (errcode == ELDLM_OK) {
2083                 *flags |= LDLM_FL_LVB_READY;
2084         }
2085
2086         /* Call the update callback. */
2087         rc = (*upcall)(cookie, lockh, errcode);
2088
2089         /* release the reference taken in ldlm_cli_enqueue() */
2090         if (errcode == ELDLM_LOCK_MATCHED)
2091                 errcode = ELDLM_OK;
2092         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2093                 ldlm_lock_decref(lockh, mode);
2094
2095         RETURN(rc);
2096 }
2097
2098 static int osc_enqueue_interpret(const struct lu_env *env,
2099                                  struct ptlrpc_request *req,
2100                                  struct osc_enqueue_args *aa, int rc)
2101 {
2102         struct ldlm_lock *lock;
2103         struct lustre_handle *lockh = &aa->oa_lockh;
2104         ldlm_mode_t mode = aa->oa_mode;
2105         struct ost_lvb *lvb = aa->oa_lvb;
2106         __u32 lvb_len = sizeof(*lvb);
2107         __u64 flags = 0;
2108
2109         ENTRY;
2110
2111         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2112          * be valid. */
2113         lock = ldlm_handle2lock(lockh);
2114         LASSERTF(lock != NULL,
2115                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2116                  lockh->cookie, req, aa);
2117
2118         /* Take an additional reference so that a blocking AST that
2119          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2120          * to arrive after an upcall has been executed by
2121          * osc_enqueue_fini(). */
2122         ldlm_lock_addref(lockh, mode);
2123
2124         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2125         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2126
2127         /* Let CP AST to grant the lock first. */
2128         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2129
2130         if (aa->oa_agl) {
2131                 LASSERT(aa->oa_lvb == NULL);
2132                 LASSERT(aa->oa_flags == NULL);
2133                 aa->oa_flags = &flags;
2134         }
2135
2136         /* Complete obtaining the lock procedure. */
2137         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2138                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2139                                    lockh, rc);
2140         /* Complete osc stuff. */
2141         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2142                               aa->oa_flags, aa->oa_agl, rc);
2143
2144         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2145
2146         ldlm_lock_decref(lockh, mode);
2147         LDLM_LOCK_PUT(lock);
2148         RETURN(rc);
2149 }
2150
2151 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2152
2153 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2154  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2155  * other synchronous requests, however keeping some locks and trying to obtain
2156  * others may take a considerable amount of time in a case of ost failure; and
2157  * when other sync requests do not get released lock from a client, the client
2158  * is evicted from the cluster -- such scenarious make the life difficult, so
2159  * release locks just after they are obtained. */
2160 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2161                      __u64 *flags, ldlm_policy_data_t *policy,
2162                      struct ost_lvb *lvb, int kms_valid,
2163                      osc_enqueue_upcall_f upcall, void *cookie,
2164                      struct ldlm_enqueue_info *einfo,
2165                      struct ptlrpc_request_set *rqset, int async, int agl)
2166 {
2167         struct obd_device *obd = exp->exp_obd;
2168         struct lustre_handle lockh = { 0 };
2169         struct ptlrpc_request *req = NULL;
2170         int intent = *flags & LDLM_FL_HAS_INTENT;
2171         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2172         ldlm_mode_t mode;
2173         int rc;
2174         ENTRY;
2175
2176         /* Filesystem lock extents are extended to page boundaries so that
2177          * dealing with the page cache is a little smoother.  */
2178         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2179         policy->l_extent.end |= ~CFS_PAGE_MASK;
2180
2181         /*
2182          * kms is not valid when either object is completely fresh (so that no
2183          * locks are cached), or object was evicted. In the latter case cached
2184          * lock cannot be used, because it would prime inode state with
2185          * potentially stale LVB.
2186          */
2187         if (!kms_valid)
2188                 goto no_match;
2189
2190         /* Next, search for already existing extent locks that will cover us */
2191         /* If we're trying to read, we also search for an existing PW lock.  The
2192          * VFS and page cache already protect us locally, so lots of readers/
2193          * writers can share a single PW lock.
2194          *
2195          * There are problems with conversion deadlocks, so instead of
2196          * converting a read lock to a write lock, we'll just enqueue a new
2197          * one.
2198          *
2199          * At some point we should cancel the read lock instead of making them
2200          * send us a blocking callback, but there are problems with canceling
2201          * locks out from other users right now, too. */
2202         mode = einfo->ei_mode;
2203         if (einfo->ei_mode == LCK_PR)
2204                 mode |= LCK_PW;
2205         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2206                                einfo->ei_type, policy, mode, &lockh, 0);
2207         if (mode) {
2208                 struct ldlm_lock *matched;
2209
2210                 if (*flags & LDLM_FL_TEST_LOCK)
2211                         RETURN(ELDLM_OK);
2212
2213                 matched = ldlm_handle2lock(&lockh);
2214                 if (agl) {
2215                         /* AGL enqueues DLM locks speculatively. Therefore if
2216                          * it already exists a DLM lock, it wll just inform the
2217                          * caller to cancel the AGL process for this stripe. */
2218                         ldlm_lock_decref(&lockh, mode);
2219                         LDLM_LOCK_PUT(matched);
2220                         RETURN(-ECANCELED);
2221                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2222                         *flags |= LDLM_FL_LVB_READY;
2223
2224                         /* We already have a lock, and it's referenced. */
2225                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2226
2227                         ldlm_lock_decref(&lockh, mode);
2228                         LDLM_LOCK_PUT(matched);
2229                         RETURN(ELDLM_OK);
2230                 } else {
2231                         ldlm_lock_decref(&lockh, mode);
2232                         LDLM_LOCK_PUT(matched);
2233                 }
2234         }
2235
2236 no_match:
2237         if (*flags & LDLM_FL_TEST_LOCK)
2238                 RETURN(-ENOLCK);
2239
2240         if (intent) {
2241                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2242                                            &RQF_LDLM_ENQUEUE_LVB);
2243                 if (req == NULL)
2244                         RETURN(-ENOMEM);
2245
2246                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2247                 if (rc < 0) {
2248                         ptlrpc_request_free(req);
2249                         RETURN(rc);
2250                 }
2251
2252                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2253                                      sizeof *lvb);
2254                 ptlrpc_request_set_replen(req);
2255         }
2256
2257         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2258         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2259
2260         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2261                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2262         if (async) {
2263                 if (!rc) {
2264                         struct osc_enqueue_args *aa;
2265                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2266                         aa = ptlrpc_req_async_args(req);
2267                         aa->oa_exp    = exp;
2268                         aa->oa_mode   = einfo->ei_mode;
2269                         aa->oa_type   = einfo->ei_type;
2270                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2271                         aa->oa_upcall = upcall;
2272                         aa->oa_cookie = cookie;
2273                         aa->oa_agl    = !!agl;
2274                         if (!agl) {
2275                                 aa->oa_flags  = flags;
2276                                 aa->oa_lvb    = lvb;
2277                         } else {
2278                                 /* AGL is essentially to enqueue an DLM lock
2279                                  * in advance, so we don't care about the
2280                                  * result of AGL enqueue. */
2281                                 aa->oa_lvb    = NULL;
2282                                 aa->oa_flags  = NULL;
2283                         }
2284
2285                         req->rq_interpret_reply =
2286                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2287                         if (rqset == PTLRPCD_SET)
2288                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2289                         else
2290                                 ptlrpc_set_add_req(rqset, req);
2291                 } else if (intent) {
2292                         ptlrpc_req_finished(req);
2293                 }
2294                 RETURN(rc);
2295         }
2296
2297         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2298                               flags, agl, rc);
2299         if (intent)
2300                 ptlrpc_req_finished(req);
2301
2302         RETURN(rc);
2303 }
2304
2305 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2306                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2307                    __u64 *flags, void *data, struct lustre_handle *lockh,
2308                    int unref)
2309 {
2310         struct obd_device *obd = exp->exp_obd;
2311         __u64 lflags = *flags;
2312         ldlm_mode_t rc;
2313         ENTRY;
2314
2315         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2316                 RETURN(-EIO);
2317
2318         /* Filesystem lock extents are extended to page boundaries so that
2319          * dealing with the page cache is a little smoother */
2320         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2321         policy->l_extent.end |= ~CFS_PAGE_MASK;
2322
2323         /* Next, search for already existing extent locks that will cover us */
2324         /* If we're trying to read, we also search for an existing PW lock.  The
2325          * VFS and page cache already protect us locally, so lots of readers/
2326          * writers can share a single PW lock. */
2327         rc = mode;
2328         if (mode == LCK_PR)
2329                 rc |= LCK_PW;
2330         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2331                              res_id, type, policy, rc, lockh, unref);
2332         if (rc) {
2333                 if (data != NULL) {
2334                         if (!osc_set_data_with_check(lockh, data)) {
2335                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2336                                         ldlm_lock_decref(lockh, rc);
2337                                 RETURN(0);
2338                         }
2339                 }
2340                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2341                         ldlm_lock_addref(lockh, LCK_PR);
2342                         ldlm_lock_decref(lockh, LCK_PW);
2343                 }
2344                 RETURN(rc);
2345         }
2346         RETURN(rc);
2347 }
2348
2349 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2350 {
2351         ENTRY;
2352
2353         if (unlikely(mode == LCK_GROUP))
2354                 ldlm_lock_decref_and_cancel(lockh, mode);
2355         else
2356                 ldlm_lock_decref(lockh, mode);
2357
2358         RETURN(0);
2359 }
2360
2361 static int osc_statfs_interpret(const struct lu_env *env,
2362                                 struct ptlrpc_request *req,
2363                                 struct osc_async_args *aa, int rc)
2364 {
2365         struct obd_statfs *msfs;
2366         ENTRY;
2367
2368         if (rc == -EBADR)
2369                 /* The request has in fact never been sent
2370                  * due to issues at a higher level (LOV).
2371                  * Exit immediately since the caller is
2372                  * aware of the problem and takes care
2373                  * of the clean up */
2374                  RETURN(rc);
2375
2376         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2377             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2378                 GOTO(out, rc = 0);
2379
2380         if (rc != 0)
2381                 GOTO(out, rc);
2382
2383         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2384         if (msfs == NULL) {
2385                 GOTO(out, rc = -EPROTO);
2386         }
2387
2388         *aa->aa_oi->oi_osfs = *msfs;
2389 out:
2390         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2391         RETURN(rc);
2392 }
2393
2394 static int osc_statfs_async(struct obd_export *exp,
2395                             struct obd_info *oinfo, __u64 max_age,
2396                             struct ptlrpc_request_set *rqset)
2397 {
2398         struct obd_device     *obd = class_exp2obd(exp);
2399         struct ptlrpc_request *req;
2400         struct osc_async_args *aa;
2401         int                    rc;
2402         ENTRY;
2403
2404         /* We could possibly pass max_age in the request (as an absolute
2405          * timestamp or a "seconds.usec ago") so the target can avoid doing
2406          * extra calls into the filesystem if that isn't necessary (e.g.
2407          * during mount that would help a bit).  Having relative timestamps
2408          * is not so great if request processing is slow, while absolute
2409          * timestamps are not ideal because they need time synchronization. */
2410         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2411         if (req == NULL)
2412                 RETURN(-ENOMEM);
2413
2414         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2415         if (rc) {
2416                 ptlrpc_request_free(req);
2417                 RETURN(rc);
2418         }
2419         ptlrpc_request_set_replen(req);
2420         req->rq_request_portal = OST_CREATE_PORTAL;
2421         ptlrpc_at_set_req_timeout(req);
2422
2423         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2424                 /* procfs requests not want stat in wait for avoid deadlock */
2425                 req->rq_no_resend = 1;
2426                 req->rq_no_delay = 1;
2427         }
2428
2429         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2430         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2431         aa = ptlrpc_req_async_args(req);
2432         aa->aa_oi = oinfo;
2433
2434         ptlrpc_set_add_req(rqset, req);
2435         RETURN(0);
2436 }
2437
2438 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2439                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2440 {
2441         struct obd_device     *obd = class_exp2obd(exp);
2442         struct obd_statfs     *msfs;
2443         struct ptlrpc_request *req;
2444         struct obd_import     *imp = NULL;
2445         int rc;
2446         ENTRY;
2447
2448         /*Since the request might also come from lprocfs, so we need
2449          *sync this with client_disconnect_export Bug15684*/
2450         down_read(&obd->u.cli.cl_sem);
2451         if (obd->u.cli.cl_import)
2452                 imp = class_import_get(obd->u.cli.cl_import);
2453         up_read(&obd->u.cli.cl_sem);
2454         if (!imp)
2455                 RETURN(-ENODEV);
2456
2457         /* We could possibly pass max_age in the request (as an absolute
2458          * timestamp or a "seconds.usec ago") so the target can avoid doing
2459          * extra calls into the filesystem if that isn't necessary (e.g.
2460          * during mount that would help a bit).  Having relative timestamps
2461          * is not so great if request processing is slow, while absolute
2462          * timestamps are not ideal because they need time synchronization. */
2463         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2464
2465         class_import_put(imp);
2466
2467         if (req == NULL)
2468                 RETURN(-ENOMEM);
2469
2470         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2471         if (rc) {
2472                 ptlrpc_request_free(req);
2473                 RETURN(rc);
2474         }
2475         ptlrpc_request_set_replen(req);
2476         req->rq_request_portal = OST_CREATE_PORTAL;
2477         ptlrpc_at_set_req_timeout(req);
2478
2479         if (flags & OBD_STATFS_NODELAY) {
2480                 /* procfs requests not want stat in wait for avoid deadlock */
2481                 req->rq_no_resend = 1;
2482                 req->rq_no_delay = 1;
2483         }
2484
2485         rc = ptlrpc_queue_wait(req);
2486         if (rc)
2487                 GOTO(out, rc);
2488
2489         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2490         if (msfs == NULL) {
2491                 GOTO(out, rc = -EPROTO);
2492         }
2493
2494         *osfs = *msfs;
2495
2496         EXIT;
2497  out:
2498         ptlrpc_req_finished(req);
2499         return rc;
2500 }
2501
2502 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2503                          void *karg, void *uarg)
2504 {
2505         struct obd_device *obd = exp->exp_obd;
2506         struct obd_ioctl_data *data = karg;
2507         int err = 0;
2508         ENTRY;
2509
2510         if (!try_module_get(THIS_MODULE)) {
2511                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2512                        module_name(THIS_MODULE));
2513                 return -EINVAL;
2514         }
2515         switch (cmd) {
2516         case OBD_IOC_CLIENT_RECOVER:
2517                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2518                                             data->ioc_inlbuf1, 0);
2519                 if (err > 0)
2520                         err = 0;
2521                 GOTO(out, err);
2522         case IOC_OSC_SET_ACTIVE:
2523                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2524                                                data->ioc_offset);
2525                 GOTO(out, err);
2526         case OBD_IOC_POLL_QUOTACHECK:
2527                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2528                 GOTO(out, err);
2529         case OBD_IOC_PING_TARGET:
2530                 err = ptlrpc_obd_ping(obd);
2531                 GOTO(out, err);
2532         default:
2533                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2534                        cmd, current_comm());
2535                 GOTO(out, err = -ENOTTY);
2536         }
2537 out:
2538         module_put(THIS_MODULE);
2539         return err;
2540 }
2541
2542 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2543                         obd_count keylen, void *key, __u32 *vallen, void *val,
2544                         struct lov_stripe_md *lsm)
2545 {
2546         ENTRY;
2547         if (!vallen || !val)
2548                 RETURN(-EFAULT);
2549
2550         if (KEY_IS(KEY_FIEMAP)) {
2551                 struct ll_fiemap_info_key *fm_key =
2552                                 (struct ll_fiemap_info_key *)key;
2553                 struct ldlm_res_id       res_id;
2554                 ldlm_policy_data_t       policy;
2555                 struct lustre_handle     lockh;
2556                 ldlm_mode_t              mode = 0;
2557                 struct ptlrpc_request   *req;
2558                 struct ll_user_fiemap   *reply;
2559                 char                    *tmp;
2560                 int                      rc;
2561
2562                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2563                         goto skip_locking;
2564
2565                 policy.l_extent.start = fm_key->fiemap.fm_start &
2566                                                 CFS_PAGE_MASK;
2567
2568                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2569                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2570                         policy.l_extent.end = OBD_OBJECT_EOF;
2571                 else
2572                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2573                                 fm_key->fiemap.fm_length +
2574                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2575
2576                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2577                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2578                                        LDLM_FL_BLOCK_GRANTED |
2579                                        LDLM_FL_LVB_READY,
2580                                        &res_id, LDLM_EXTENT, &policy,
2581                                        LCK_PR | LCK_PW, &lockh, 0);
2582                 if (mode) { /* lock is cached on client */
2583                         if (mode != LCK_PR) {
2584                                 ldlm_lock_addref(&lockh, LCK_PR);
2585                                 ldlm_lock_decref(&lockh, LCK_PW);
2586                         }
2587                 } else { /* no cached lock, needs acquire lock on server side */
2588                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2589                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2590                 }
2591
2592 skip_locking:
2593                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2594                                            &RQF_OST_GET_INFO_FIEMAP);
2595                 if (req == NULL)
2596                         GOTO(drop_lock, rc = -ENOMEM);
2597
2598                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2599                                      RCL_CLIENT, keylen);
2600                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2601                                      RCL_CLIENT, *vallen);
2602                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2603                                      RCL_SERVER, *vallen);
2604
2605                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2606                 if (rc) {
2607                         ptlrpc_request_free(req);
2608                         GOTO(drop_lock, rc);
2609                 }
2610
2611                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2612                 memcpy(tmp, key, keylen);
2613                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2614                 memcpy(tmp, val, *vallen);
2615
2616                 ptlrpc_request_set_replen(req);
2617                 rc = ptlrpc_queue_wait(req);
2618                 if (rc)
2619                         GOTO(fini_req, rc);
2620
2621                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2622                 if (reply == NULL)
2623                         GOTO(fini_req, rc = -EPROTO);
2624
2625                 memcpy(val, reply, *vallen);
2626 fini_req:
2627                 ptlrpc_req_finished(req);
2628 drop_lock:
2629                 if (mode)
2630                         ldlm_lock_decref(&lockh, LCK_PR);
2631                 RETURN(rc);
2632         }
2633
2634         RETURN(-EINVAL);
2635 }
2636
2637 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2638                               obd_count keylen, void *key, obd_count vallen,
2639                               void *val, struct ptlrpc_request_set *set)
2640 {
2641         struct ptlrpc_request *req;
2642         struct obd_device     *obd = exp->exp_obd;
2643         struct obd_import     *imp = class_exp2cliimp(exp);
2644         char                  *tmp;
2645         int                    rc;
2646         ENTRY;
2647
2648         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2649
2650         if (KEY_IS(KEY_CHECKSUM)) {
2651                 if (vallen != sizeof(int))
2652                         RETURN(-EINVAL);
2653                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2654                 RETURN(0);
2655         }
2656
2657         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2658                 sptlrpc_conf_client_adapt(obd);
2659                 RETURN(0);
2660         }
2661
2662         if (KEY_IS(KEY_FLUSH_CTX)) {
2663                 sptlrpc_import_flush_my_ctx(imp);
2664                 RETURN(0);
2665         }
2666
2667         if (KEY_IS(KEY_CACHE_SET)) {
2668                 struct client_obd *cli = &obd->u.cli;
2669
2670                 LASSERT(cli->cl_cache == NULL); /* only once */
2671                 cli->cl_cache = (struct cl_client_cache *)val;
2672                 atomic_inc(&cli->cl_cache->ccc_users);
2673                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2674
2675                 /* add this osc into entity list */
2676                 LASSERT(list_empty(&cli->cl_lru_osc));
2677                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2678                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2679                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2680
2681                 RETURN(0);
2682         }
2683
2684         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2685                 struct client_obd *cli = &obd->u.cli;
2686                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2687                 long target = *(long *)val;
2688
2689                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2690                 *(long *)val -= nr;
2691                 RETURN(0);
2692         }
2693
2694         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2695                 RETURN(-EINVAL);
2696
2697         /* We pass all other commands directly to OST. Since nobody calls osc
2698            methods directly and everybody is supposed to go through LOV, we
2699            assume lov checked invalid values for us.
2700            The only recognised values so far are evict_by_nid and mds_conn.
2701            Even if something bad goes through, we'd get a -EINVAL from OST
2702            anyway. */
2703
2704         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2705                                                 &RQF_OST_SET_GRANT_INFO :
2706                                                 &RQF_OBD_SET_INFO);
2707         if (req == NULL)
2708                 RETURN(-ENOMEM);
2709
2710         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2711                              RCL_CLIENT, keylen);
2712         if (!KEY_IS(KEY_GRANT_SHRINK))
2713                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2714                                      RCL_CLIENT, vallen);
2715         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2716         if (rc) {
2717                 ptlrpc_request_free(req);
2718                 RETURN(rc);
2719         }
2720
2721         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2722         memcpy(tmp, key, keylen);
2723         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2724                                                         &RMF_OST_BODY :
2725                                                         &RMF_SETINFO_VAL);
2726         memcpy(tmp, val, vallen);
2727
2728         if (KEY_IS(KEY_GRANT_SHRINK)) {
2729                 struct osc_grant_args *aa;
2730                 struct obdo *oa;
2731
2732                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2733                 aa = ptlrpc_req_async_args(req);
2734                 OBDO_ALLOC(oa);
2735                 if (!oa) {
2736                         ptlrpc_req_finished(req);
2737                         RETURN(-ENOMEM);
2738                 }
2739                 *oa = ((struct ost_body *)val)->oa;
2740                 aa->aa_oa = oa;
2741                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2742         }
2743
2744         ptlrpc_request_set_replen(req);
2745         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2746                 LASSERT(set != NULL);
2747                 ptlrpc_set_add_req(set, req);
2748                 ptlrpc_check_set(NULL, set);
2749         } else
2750                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2751
2752         RETURN(0);
2753 }
2754
2755 static int osc_reconnect(const struct lu_env *env,
2756                          struct obd_export *exp, struct obd_device *obd,
2757                          struct obd_uuid *cluuid,
2758                          struct obd_connect_data *data,
2759                          void *localdata)
2760 {
2761         struct client_obd *cli = &obd->u.cli;
2762
2763         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2764                 long lost_grant;
2765
2766                 spin_lock(&cli->cl_loi_list_lock);
2767                 data->ocd_grant = (cli->cl_avail_grant +
2768                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2769                                   2 * cli_brw_size(obd);
2770                 lost_grant = cli->cl_lost_grant;
2771                 cli->cl_lost_grant = 0;
2772                 spin_unlock(&cli->cl_loi_list_lock);
2773
2774                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2775                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2776                        data->ocd_version, data->ocd_grant, lost_grant);
2777         }
2778
2779         RETURN(0);
2780 }
2781
2782 static int osc_disconnect(struct obd_export *exp)
2783 {
2784         struct obd_device *obd = class_exp2obd(exp);
2785         int rc;
2786
2787         rc = client_disconnect_export(exp);
2788         /**
2789          * Initially we put del_shrink_grant before disconnect_export, but it
2790          * causes the following problem if setup (connect) and cleanup
2791          * (disconnect) are tangled together.
2792          *      connect p1                     disconnect p2
2793          *   ptlrpc_connect_import
2794          *     ...............               class_manual_cleanup
2795          *                                     osc_disconnect
2796          *                                     del_shrink_grant
2797          *   ptlrpc_connect_interrupt
2798          *     init_grant_shrink
2799          *   add this client to shrink list
2800          *                                      cleanup_osc
2801          * Bang! pinger trigger the shrink.
2802          * So the osc should be disconnected from the shrink list, after we
2803          * are sure the import has been destroyed. BUG18662
2804          */
2805         if (obd->u.cli.cl_import == NULL)
2806                 osc_del_shrink_grant(&obd->u.cli);
2807         return rc;
2808 }
2809
2810 static int osc_import_event(struct obd_device *obd,
2811                             struct obd_import *imp,
2812                             enum obd_import_event event)
2813 {
2814         struct client_obd *cli;
2815         int rc = 0;
2816
2817         ENTRY;
2818         LASSERT(imp->imp_obd == obd);
2819
2820         switch (event) {
2821         case IMP_EVENT_DISCON: {
2822                 cli = &obd->u.cli;
2823                 spin_lock(&cli->cl_loi_list_lock);
2824                 cli->cl_avail_grant = 0;
2825                 cli->cl_lost_grant = 0;
2826                 spin_unlock(&cli->cl_loi_list_lock);
2827                 break;
2828         }
2829         case IMP_EVENT_INACTIVE: {
2830                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2831                 break;
2832         }
2833         case IMP_EVENT_INVALIDATE: {
2834                 struct ldlm_namespace *ns = obd->obd_namespace;
2835                 struct lu_env         *env;
2836                 int                    refcheck;
2837
2838                 env = cl_env_get(&refcheck);
2839                 if (!IS_ERR(env)) {
2840                         /* Reset grants */
2841                         cli = &obd->u.cli;
2842                         /* all pages go to failing rpcs due to the invalid
2843                          * import */
2844                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2845
2846                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2847                         cl_env_put(env, &refcheck);
2848                 } else
2849                         rc = PTR_ERR(env);
2850                 break;
2851         }
2852         case IMP_EVENT_ACTIVE: {
2853                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2854                 break;
2855         }
2856         case IMP_EVENT_OCD: {
2857                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2858
2859                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2860                         osc_init_grant(&obd->u.cli, ocd);
2861
2862                 /* See bug 7198 */
2863                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2864                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2865
2866                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2867                 break;
2868         }
2869         case IMP_EVENT_DEACTIVATE: {
2870                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2871                 break;
2872         }
2873         case IMP_EVENT_ACTIVATE: {
2874                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2875                 break;
2876         }
2877         default:
2878                 CERROR("Unknown import event %d\n", event);
2879                 LBUG();
2880         }
2881         RETURN(rc);
2882 }
2883
2884 /**
2885  * Determine whether the lock can be canceled before replaying the lock
2886  * during recovery, see bug16774 for detailed information.
2887  *
2888  * \retval zero the lock can't be canceled
2889  * \retval other ok to cancel
2890  */
2891 static int osc_cancel_weight(struct ldlm_lock *lock)
2892 {
2893         /*
2894          * Cancel all unused and granted extent lock.
2895          */
2896         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2897             lock->l_granted_mode == lock->l_req_mode &&
2898             osc_ldlm_weigh_ast(lock) == 0)
2899                 RETURN(1);
2900
2901         RETURN(0);
2902 }
2903
2904 static int brw_queue_work(const struct lu_env *env, void *data)
2905 {
2906         struct client_obd *cli = data;
2907
2908         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2909
2910         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2911         RETURN(0);
2912 }
2913
2914 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2915 {
2916         struct client_obd *cli = &obd->u.cli;
2917         struct obd_type   *type;
2918         void              *handler;
2919         int                rc;
2920         ENTRY;
2921
2922         rc = ptlrpcd_addref();
2923         if (rc)
2924                 RETURN(rc);
2925
2926         rc = client_obd_setup(obd, lcfg);
2927         if (rc)
2928                 GOTO(out_ptlrpcd, rc);
2929
2930         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2931         if (IS_ERR(handler))
2932                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2933         cli->cl_writeback_work = handler;
2934
2935         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2936         if (IS_ERR(handler))
2937                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2938         cli->cl_lru_work = handler;
2939
2940         rc = osc_quota_setup(obd);
2941         if (rc)
2942                 GOTO(out_ptlrpcd_work, rc);
2943
2944         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2945
2946 #ifdef LPROCFS
2947         obd->obd_vars = lprocfs_osc_obd_vars;
2948 #endif
2949         /* If this is true then both client (osc) and server (osp) are on the
2950          * same node. The osp layer if loaded first will register the osc proc
2951          * directory. In that case this obd_device will be attached its proc
2952          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2953         type = class_search_type(LUSTRE_OSP_NAME);
2954         if (type && type->typ_procsym) {
2955                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
2956                                                            type->typ_procsym,
2957                                                            obd->obd_vars, obd);
2958                 if (IS_ERR(obd->obd_proc_entry)) {
2959                         rc = PTR_ERR(obd->obd_proc_entry);
2960                         CERROR("error %d setting up lprocfs for %s\n", rc,
2961                                obd->obd_name);
2962                         obd->obd_proc_entry = NULL;
2963                 }
2964         } else {
2965                 rc = lprocfs_obd_setup(obd);
2966         }
2967
2968         /* If the basic OSC proc tree construction succeeded then
2969          * lets do the rest. */
2970         if (rc == 0) {
2971                 lproc_osc_attach_seqstat(obd);
2972                 sptlrpc_lprocfs_cliobd_attach(obd);
2973                 ptlrpc_lprocfs_register_obd(obd);
2974         }
2975
2976         /* We need to allocate a few requests more, because
2977          * brw_interpret tries to create new requests before freeing
2978          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2979          * reserved, but I'm afraid that might be too much wasted RAM
2980          * in fact, so 2 is just my guess and still should work. */
2981         cli->cl_import->imp_rq_pool =
2982                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2983                                     OST_MAXREQSIZE,
2984                                     ptlrpc_add_rqs_to_pool);
2985
2986         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2987         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2988         RETURN(0);
2989
2990 out_ptlrpcd_work:
2991         if (cli->cl_writeback_work != NULL) {
2992                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2993                 cli->cl_writeback_work = NULL;
2994         }
2995         if (cli->cl_lru_work != NULL) {
2996                 ptlrpcd_destroy_work(cli->cl_lru_work);
2997                 cli->cl_lru_work = NULL;
2998         }
2999 out_client_setup:
3000         client_obd_cleanup(obd);
3001 out_ptlrpcd:
3002         ptlrpcd_decref();
3003         RETURN(rc);
3004 }
3005
3006 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3007 {
3008         int rc = 0;
3009         ENTRY;
3010
3011         switch (stage) {
3012         case OBD_CLEANUP_EARLY: {
3013                 struct obd_import *imp;
3014                 imp = obd->u.cli.cl_import;
3015                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3016                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3017                 ptlrpc_deactivate_import(imp);
3018                 spin_lock(&imp->imp_lock);
3019                 imp->imp_pingable = 0;
3020                 spin_unlock(&imp->imp_lock);
3021                 break;
3022         }
3023         case OBD_CLEANUP_EXPORTS: {
3024                 struct client_obd *cli = &obd->u.cli;
3025                 /* LU-464
3026                  * for echo client, export may be on zombie list, wait for
3027                  * zombie thread to cull it, because cli.cl_import will be
3028                  * cleared in client_disconnect_export():
3029                  *   class_export_destroy() -> obd_cleanup() ->
3030                  *   echo_device_free() -> echo_client_cleanup() ->
3031                  *   obd_disconnect() -> osc_disconnect() ->
3032                  *   client_disconnect_export()
3033                  */
3034                 obd_zombie_barrier();
3035                 if (cli->cl_writeback_work) {
3036                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3037                         cli->cl_writeback_work = NULL;
3038                 }
3039                 if (cli->cl_lru_work) {
3040                         ptlrpcd_destroy_work(cli->cl_lru_work);
3041                         cli->cl_lru_work = NULL;
3042                 }
3043                 obd_cleanup_client_import(obd);
3044                 ptlrpc_lprocfs_unregister_obd(obd);
3045                 lprocfs_obd_cleanup(obd);
3046                 break;
3047                 }
3048         }
3049         RETURN(rc);
3050 }
3051
3052 int osc_cleanup(struct obd_device *obd)
3053 {
3054         struct client_obd *cli = &obd->u.cli;
3055         int rc;
3056
3057         ENTRY;
3058
3059         /* lru cleanup */
3060         if (cli->cl_cache != NULL) {
3061                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3062                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3063                 list_del_init(&cli->cl_lru_osc);
3064                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3065                 cli->cl_lru_left = NULL;
3066                 atomic_dec(&cli->cl_cache->ccc_users);
3067                 cli->cl_cache = NULL;
3068         }
3069
3070         /* free memory of osc quota cache */
3071         osc_quota_cleanup(obd);
3072
3073         rc = client_obd_cleanup(obd);
3074
3075         ptlrpcd_decref();
3076         RETURN(rc);
3077 }
3078
3079 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3080 {
3081         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3082         return rc > 0 ? 0: rc;
3083 }
3084
3085 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3086 {
3087         return osc_process_config_base(obd, buf);
3088 }
3089
3090 static struct obd_ops osc_obd_ops = {
3091         .o_owner                = THIS_MODULE,
3092         .o_setup                = osc_setup,
3093         .o_precleanup           = osc_precleanup,
3094         .o_cleanup              = osc_cleanup,
3095         .o_add_conn             = client_import_add_conn,
3096         .o_del_conn             = client_import_del_conn,
3097         .o_connect              = client_connect_import,
3098         .o_reconnect            = osc_reconnect,
3099         .o_disconnect           = osc_disconnect,
3100         .o_statfs               = osc_statfs,
3101         .o_statfs_async         = osc_statfs_async,
3102         .o_create               = osc_create,
3103         .o_destroy              = osc_destroy,
3104         .o_getattr              = osc_getattr,
3105         .o_getattr_async        = osc_getattr_async,
3106         .o_setattr              = osc_setattr,
3107         .o_setattr_async        = osc_setattr_async,
3108         .o_change_cbdata        = osc_change_cbdata,
3109         .o_find_cbdata          = osc_find_cbdata,
3110         .o_iocontrol            = osc_iocontrol,
3111         .o_get_info             = osc_get_info,
3112         .o_set_info_async       = osc_set_info_async,
3113         .o_import_event         = osc_import_event,
3114         .o_process_config       = osc_process_config,
3115         .o_quotactl             = osc_quotactl,
3116         .o_quotacheck           = osc_quotacheck,
3117 };
3118
3119 static int __init osc_init(void)
3120 {
3121         bool enable_proc = true;
3122         struct obd_type *type;
3123         int rc;
3124         ENTRY;
3125
3126         /* print an address of _any_ initialized kernel symbol from this
3127          * module, to allow debugging with gdb that doesn't support data
3128          * symbols from modules.*/
3129         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3130
3131         rc = lu_kmem_init(osc_caches);
3132         if (rc)
3133                 RETURN(rc);
3134
3135         type = class_search_type(LUSTRE_OSP_NAME);
3136         if (type != NULL && type->typ_procsym != NULL)
3137                 enable_proc = false;
3138
3139         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3140                                  LUSTRE_OSC_NAME, &osc_device_type);
3141         if (rc) {
3142                 lu_kmem_fini(osc_caches);
3143                 RETURN(rc);
3144         }
3145
3146         RETURN(rc);
3147 }
3148
3149 static void /*__exit*/ osc_exit(void)
3150 {
3151         class_unregister_type(LUSTRE_OSC_NAME);
3152         lu_kmem_fini(osc_caches);
3153 }
3154
3155 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3156 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3157 MODULE_LICENSE("GPL");
3158
3159 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);