Whamcloud - gitweb
3eb7c7cb776c75623106db61c30b294a4bc614fb
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 static inline void osc_pack_capa(struct ptlrpc_request *req,
104                                  struct ost_body *body, void *capa)
105 {
106         struct obd_capa *oc = (struct obd_capa *)capa;
107         struct lustre_capa *c;
108
109         if (!capa)
110                 return;
111
112         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
113         LASSERT(c);
114         capa_cpy(c, oc);
115         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
116         DEBUG_CAPA(D_SEC, c, "pack");
117 }
118
119 static inline void osc_pack_req_body(struct ptlrpc_request *req,
120                                      struct obd_info *oinfo)
121 {
122         struct ost_body *body;
123
124         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
125         LASSERT(body);
126
127         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
128                              oinfo->oi_oa);
129         osc_pack_capa(req, body, oinfo->oi_capa);
130 }
131
132 static inline void osc_set_capa_size(struct ptlrpc_request *req,
133                                      const struct req_msg_field *field,
134                                      struct obd_capa *oc)
135 {
136         if (oc == NULL)
137                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
138         else
139                 /* it is already calculated as sizeof struct obd_capa */
140                 ;
141 }
142
143 static int osc_getattr_interpret(const struct lu_env *env,
144                                  struct ptlrpc_request *req,
145                                  struct osc_async_args *aa, int rc)
146 {
147         struct ost_body *body;
148         ENTRY;
149
150         if (rc != 0)
151                 GOTO(out, rc);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154         if (body) {
155                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
156                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
157                                      aa->aa_oi->oi_oa, &body->oa);
158
159                 /* This should really be sent by the OST */
160                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
161                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
162         } else {
163                 CDEBUG(D_INFO, "can't unpack ost_body\n");
164                 rc = -EPROTO;
165                 aa->aa_oi->oi_oa->o_valid = 0;
166         }
167 out:
168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
169         RETURN(rc);
170 }
171
172 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
173                              struct ptlrpc_request_set *set)
174 {
175         struct ptlrpc_request *req;
176         struct osc_async_args *aa;
177         int                    rc;
178         ENTRY;
179
180         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
181         if (req == NULL)
182                 RETURN(-ENOMEM);
183
184         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
185         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
186         if (rc) {
187                 ptlrpc_request_free(req);
188                 RETURN(rc);
189         }
190
191         osc_pack_req_body(req, oinfo);
192
193         ptlrpc_request_set_replen(req);
194         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
195
196         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
197         aa = ptlrpc_req_async_args(req);
198         aa->aa_oi = oinfo;
199
200         ptlrpc_set_add_req(set, req);
201         RETURN(0);
202 }
203
204 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
205                        struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body       *body;
209         int                    rc;
210         ENTRY;
211
212         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
213         if (req == NULL)
214                 RETURN(-ENOMEM);
215
216         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
217         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
218         if (rc) {
219                 ptlrpc_request_free(req);
220                 RETURN(rc);
221         }
222
223         osc_pack_req_body(req, oinfo);
224
225         ptlrpc_request_set_replen(req);
226
227         rc = ptlrpc_queue_wait(req);
228         if (rc)
229                 GOTO(out, rc);
230
231         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
232         if (body == NULL)
233                 GOTO(out, rc = -EPROTO);
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
237                              &body->oa);
238
239         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
249                        struct obd_info *oinfo, struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body       *body;
253         int                    rc;
254         ENTRY;
255
256         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
257
258         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
259         if (req == NULL)
260                 RETURN(-ENOMEM);
261
262         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
263         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
264         if (rc) {
265                 ptlrpc_request_free(req);
266                 RETURN(rc);
267         }
268
269         osc_pack_req_body(req, oinfo);
270
271         ptlrpc_request_set_replen(req);
272
273         rc = ptlrpc_queue_wait(req);
274         if (rc)
275                 GOTO(out, rc);
276
277         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
282                              &body->oa);
283
284         EXIT;
285 out:
286         ptlrpc_req_finished(req);
287         RETURN(rc);
288 }
289
290 static int osc_setattr_interpret(const struct lu_env *env,
291                                  struct ptlrpc_request *req,
292                                  struct osc_setattr_args *sa, int rc)
293 {
294         struct ost_body *body;
295         ENTRY;
296
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
301         if (body == NULL)
302                 GOTO(out, rc = -EPROTO);
303
304         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
305                              &body->oa);
306 out:
307         rc = sa->sa_upcall(sa->sa_cookie, rc);
308         RETURN(rc);
309 }
310
311 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
312                            struct obd_trans_info *oti,
313                            obd_enqueue_update_f upcall, void *cookie,
314                            struct ptlrpc_request_set *rqset)
315 {
316         struct ptlrpc_request   *req;
317         struct osc_setattr_args *sa;
318         int                      rc;
319         ENTRY;
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
333                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         /* do mds to ost setattr asynchronously */
340         if (!rqset) {
341                 /* Do not wait for response. */
342                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
343         } else {
344                 req->rq_interpret_reply =
345                         (ptlrpc_interpterer_t)osc_setattr_interpret;
346
347                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
348                 sa = ptlrpc_req_async_args(req);
349                 sa->sa_oa = oinfo->oi_oa;
350                 sa->sa_upcall = upcall;
351                 sa->sa_cookie = cookie;
352
353                 if (rqset == PTLRPCD_SET)
354                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
355                 else
356                         ptlrpc_set_add_req(rqset, req);
357         }
358
359         RETURN(0);
360 }
361
362 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
363                              struct obd_trans_info *oti,
364                              struct ptlrpc_request_set *rqset)
365 {
366         return osc_setattr_async_base(exp, oinfo, oti,
367                                       oinfo->oi_cb_up, oinfo, rqset);
368 }
369
370 static int osc_create(const struct lu_env *env, struct obd_export *exp,
371                       struct obdo *oa, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body       *body;
375         int                    rc;
376         ENTRY;
377
378         LASSERT(oa != NULL);
379         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
380         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
383         if (req == NULL)
384                 GOTO(out, rc = -ENOMEM);
385
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 GOTO(out, rc);
390         }
391
392         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
393         LASSERT(body);
394
395         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
396
397         ptlrpc_request_set_replen(req);
398
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
412         if (body == NULL)
413                 GOTO(out_req, rc = -EPROTO);
414
415         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
416         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
417
418         oa->o_blksize = cli_brw_size(exp->exp_obd);
419         oa->o_valid |= OBD_MD_FLBLKSZ;
420
421         if (oti != NULL) {
422                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
423                         if (oti->oti_logcookies == NULL)
424                                 oti->oti_logcookies = &oti->oti_onecookie;
425
426                         *oti->oti_logcookies = oa->o_lcookie;
427                 }
428         }
429
430         CDEBUG(D_HA, "transno: "LPD64"\n",
431                lustre_msg_get_transno(req->rq_repmsg));
432 out_req:
433         ptlrpc_req_finished(req);
434 out:
435         RETURN(rc);
436 }
437
438 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
439                    obd_enqueue_update_f upcall, void *cookie,
440                    struct ptlrpc_request_set *rqset)
441 {
442         struct ptlrpc_request   *req;
443         struct osc_setattr_args *sa;
444         struct ost_body         *body;
445         int                      rc;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
449         if (req == NULL)
450                 RETURN(-ENOMEM);
451
452         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(rc);
457         }
458         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
459         ptlrpc_at_set_req_timeout(req);
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
464                              oinfo->oi_oa);
465         osc_pack_capa(req, body, oinfo->oi_capa);
466
467         ptlrpc_request_set_replen(req);
468
469         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
470         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
471         sa = ptlrpc_req_async_args(req);
472         sa->sa_oa     = oinfo->oi_oa;
473         sa->sa_upcall = upcall;
474         sa->sa_cookie = cookie;
475         if (rqset == PTLRPCD_SET)
476                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
477         else
478                 ptlrpc_set_add_req(rqset, req);
479
480         RETURN(0);
481 }
482
483 static int osc_sync_interpret(const struct lu_env *env,
484                               struct ptlrpc_request *req,
485                               void *arg, int rc)
486 {
487         struct osc_fsync_args *fa = arg;
488         struct ost_body *body;
489         ENTRY;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
495         if (body == NULL) {
496                 CERROR ("can't unpack ost_body\n");
497                 GOTO(out, rc = -EPROTO);
498         }
499
500         *fa->fa_oi->oi_oa = body->oa;
501 out:
502         rc = fa->fa_upcall(fa->fa_cookie, rc);
503         RETURN(rc);
504 }
505
506 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
507                   obd_enqueue_update_f upcall, void *cookie,
508                   struct ptlrpc_request_set *rqset)
509 {
510         struct ptlrpc_request *req;
511         struct ost_body       *body;
512         struct osc_fsync_args *fa;
513         int                    rc;
514         ENTRY;
515
516         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
517         if (req == NULL)
518                 RETURN(-ENOMEM);
519
520         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
521         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
522         if (rc) {
523                 ptlrpc_request_free(req);
524                 RETURN(rc);
525         }
526
527         /* overload the size and blocks fields in the oa with start/end */
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
531                              oinfo->oi_oa);
532         osc_pack_capa(req, body, oinfo->oi_capa);
533
534         ptlrpc_request_set_replen(req);
535         req->rq_interpret_reply = osc_sync_interpret;
536
537         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
538         fa = ptlrpc_req_async_args(req);
539         fa->fa_oi = oinfo;
540         fa->fa_upcall = upcall;
541         fa->fa_cookie = cookie;
542
543         if (rqset == PTLRPCD_SET)
544                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
545         else
546                 ptlrpc_set_add_req(rqset, req);
547
548         RETURN (0);
549 }
550
551 /* Find and cancel locally locks matched by @mode in the resource found by
552  * @objid. Found locks are added into @cancel list. Returns the amount of
553  * locks added to @cancels list. */
554 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
555                                    struct list_head *cancels,
556                                    ldlm_mode_t mode, __u64 lock_flags)
557 {
558         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
559         struct ldlm_res_id res_id;
560         struct ldlm_resource *res;
561         int count;
562         ENTRY;
563
564         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
565          * export) but disabled through procfs (flag in NS).
566          *
567          * This distinguishes from a case when ELC is not supported originally,
568          * when we still want to cancel locks in advance and just cancel them
569          * locally, without sending any RPC. */
570         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
571                 RETURN(0);
572
573         ostid_build_res_name(&oa->o_oi, &res_id);
574         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
575         if (IS_ERR(res))
576                 RETURN(0);
577
578         LDLM_RESOURCE_ADDREF(res);
579         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
580                                            lock_flags, 0, NULL);
581         LDLM_RESOURCE_DELREF(res);
582         ldlm_resource_putref(res);
583         RETURN(count);
584 }
585
586 static int osc_destroy_interpret(const struct lu_env *env,
587                                  struct ptlrpc_request *req, void *data,
588                                  int rc)
589 {
590         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
591
592         atomic_dec(&cli->cl_destroy_in_flight);
593         wake_up(&cli->cl_destroy_waitq);
594         return 0;
595 }
596
597 static int osc_can_send_destroy(struct client_obd *cli)
598 {
599         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
600             cli->cl_max_rpcs_in_flight) {
601                 /* The destroy request can be sent */
602                 return 1;
603         }
604         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
605             cli->cl_max_rpcs_in_flight) {
606                 /*
607                  * The counter has been modified between the two atomic
608                  * operations.
609                  */
610                 wake_up(&cli->cl_destroy_waitq);
611         }
612         return 0;
613 }
614
615 /* Destroy requests can be async always on the client, and we don't even really
616  * care about the return code since the client cannot do anything at all about
617  * a destroy failure.
618  * When the MDS is unlinking a filename, it saves the file objects into a
619  * recovery llog, and these object records are cancelled when the OST reports
620  * they were destroyed and sync'd to disk (i.e. transaction committed).
621  * If the client dies, or the OST is down when the object should be destroyed,
622  * the records are not cancelled, and when the OST reconnects to the MDS next,
623  * it will retrieve the llog unlink logs and then sends the log cancellation
624  * cookies to the MDS after committing destroy transactions. */
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
626                        struct obdo *oa, struct lov_stripe_md *ea,
627                        struct obd_trans_info *oti, struct obd_export *md_export,
628                        void *capa)
629 {
630         struct client_obd     *cli = &exp->exp_obd->u.cli;
631         struct ptlrpc_request *req;
632         struct ost_body       *body;
633         struct list_head       cancels = LIST_HEAD_INIT(cancels);
634         int rc, count;
635         ENTRY;
636
637         if (!oa) {
638                 CDEBUG(D_INFO, "oa NULL\n");
639                 RETURN(-EINVAL);
640         }
641
642         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
643                                         LDLM_FL_DISCARD_DATA);
644
645         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
646         if (req == NULL) {
647                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
648                 RETURN(-ENOMEM);
649         }
650
651         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
652         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
653                                0, &cancels, count);
654         if (rc) {
655                 ptlrpc_request_free(req);
656                 RETURN(rc);
657         }
658
659         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
660         ptlrpc_at_set_req_timeout(req);
661
662         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
663                 oa->o_lcookie = *oti->oti_logcookies;
664         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
665         LASSERT(body);
666         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
667
668         osc_pack_capa(req, body, (struct obd_capa *)capa);
669         ptlrpc_request_set_replen(req);
670
671         /* If osc_destory is for destroying the unlink orphan,
672          * sent from MDT to OST, which should not be blocked here,
673          * because the process might be triggered by ptlrpcd, and
674          * it is not good to block ptlrpcd thread (b=16006)*/
675         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
676                 req->rq_interpret_reply = osc_destroy_interpret;
677                 if (!osc_can_send_destroy(cli)) {
678                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
679                                                           NULL);
680
681                         /*
682                          * Wait until the number of on-going destroy RPCs drops
683                          * under max_rpc_in_flight
684                          */
685                         l_wait_event_exclusive(cli->cl_destroy_waitq,
686                                                osc_can_send_destroy(cli), &lwi);
687                 }
688         }
689
690         /* Do not wait for response */
691         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
692         RETURN(0);
693 }
694
695 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
696                                 long writing_bytes)
697 {
698         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
699
700         LASSERT(!(oa->o_valid & bits));
701
702         oa->o_valid |= bits;
703         client_obd_list_lock(&cli->cl_loi_list_lock);
704         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
705         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
706                      cli->cl_dirty_max_pages)) {
707                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
708                        cli->cl_dirty_pages, cli->cl_dirty_transit,
709                        cli->cl_dirty_max_pages);
710                 oa->o_undirty = 0;
711         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
712                             atomic_long_read(&obd_dirty_transit_pages) >
713                             (obd_max_dirty_pages + 1))) {
714                 /* The atomic_read() allowing the atomic_inc() are
715                  * not covered by a lock thus they may safely race and trip
716                  * this CERROR() unless we add in a small fudge factor (+1). */
717                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
718                        cli->cl_import->imp_obd->obd_name,
719                        atomic_long_read(&obd_dirty_pages),
720                        atomic_long_read(&obd_dirty_transit_pages),
721                        obd_max_dirty_pages);
722                 oa->o_undirty = 0;
723         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
724                             0x7fffffff)) {
725                 CERROR("dirty %lu - dirty_max %lu too big???\n",
726                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
727                 oa->o_undirty = 0;
728         } else {
729                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
730                                       PAGE_CACHE_SHIFT) *
731                                      (cli->cl_max_rpcs_in_flight + 1);
732                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
733                                     max_in_flight);
734         }
735         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
736         oa->o_dropped = cli->cl_lost_grant;
737         cli->cl_lost_grant = 0;
738         client_obd_list_unlock(&cli->cl_loi_list_lock);
739         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
740                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
741
742 }
743
744 void osc_update_next_shrink(struct client_obd *cli)
745 {
746         cli->cl_next_shrink_grant =
747                 cfs_time_shift(cli->cl_grant_shrink_interval);
748         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
749                cli->cl_next_shrink_grant);
750 }
751
752 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
753 {
754         client_obd_list_lock(&cli->cl_loi_list_lock);
755         cli->cl_avail_grant += grant;
756         client_obd_list_unlock(&cli->cl_loi_list_lock);
757 }
758
759 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
760 {
761         if (body->oa.o_valid & OBD_MD_FLGRANT) {
762                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
763                 __osc_update_grant(cli, body->oa.o_grant);
764         }
765 }
766
767 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
768                               obd_count keylen, void *key, obd_count vallen,
769                               void *val, struct ptlrpc_request_set *set);
770
771 static int osc_shrink_grant_interpret(const struct lu_env *env,
772                                       struct ptlrpc_request *req,
773                                       void *aa, int rc)
774 {
775         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
776         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
777         struct ost_body *body;
778
779         if (rc != 0) {
780                 __osc_update_grant(cli, oa->o_grant);
781                 GOTO(out, rc);
782         }
783
784         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
785         LASSERT(body);
786         osc_update_grant(cli, body);
787 out:
788         OBDO_FREE(oa);
789         return rc;
790 }
791
792 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
793 {
794         client_obd_list_lock(&cli->cl_loi_list_lock);
795         oa->o_grant = cli->cl_avail_grant / 4;
796         cli->cl_avail_grant -= oa->o_grant;
797         client_obd_list_unlock(&cli->cl_loi_list_lock);
798         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
799                 oa->o_valid |= OBD_MD_FLFLAGS;
800                 oa->o_flags = 0;
801         }
802         oa->o_flags |= OBD_FL_SHRINK_GRANT;
803         osc_update_next_shrink(cli);
804 }
805
806 /* Shrink the current grant, either from some large amount to enough for a
807  * full set of in-flight RPCs, or if we have already shrunk to that limit
808  * then to enough for a single RPC.  This avoids keeping more grant than
809  * needed, and avoids shrinking the grant piecemeal. */
810 static int osc_shrink_grant(struct client_obd *cli)
811 {
812         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
813                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
814
815         client_obd_list_lock(&cli->cl_loi_list_lock);
816         if (cli->cl_avail_grant <= target_bytes)
817                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
818         client_obd_list_unlock(&cli->cl_loi_list_lock);
819
820         return osc_shrink_grant_to_target(cli, target_bytes);
821 }
822
823 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
824 {
825         int                     rc = 0;
826         struct ost_body        *body;
827         ENTRY;
828
829         client_obd_list_lock(&cli->cl_loi_list_lock);
830         /* Don't shrink if we are already above or below the desired limit
831          * We don't want to shrink below a single RPC, as that will negatively
832          * impact block allocation and long-term performance. */
833         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
834                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
835
836         if (target_bytes >= cli->cl_avail_grant) {
837                 client_obd_list_unlock(&cli->cl_loi_list_lock);
838                 RETURN(0);
839         }
840         client_obd_list_unlock(&cli->cl_loi_list_lock);
841
842         OBD_ALLOC_PTR(body);
843         if (!body)
844                 RETURN(-ENOMEM);
845
846         osc_announce_cached(cli, &body->oa, 0);
847
848         client_obd_list_lock(&cli->cl_loi_list_lock);
849         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
850         cli->cl_avail_grant = target_bytes;
851         client_obd_list_unlock(&cli->cl_loi_list_lock);
852         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
853                 body->oa.o_valid |= OBD_MD_FLFLAGS;
854                 body->oa.o_flags = 0;
855         }
856         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
857         osc_update_next_shrink(cli);
858
859         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
860                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
861                                 sizeof(*body), body, NULL);
862         if (rc != 0)
863                 __osc_update_grant(cli, body->oa.o_grant);
864         OBD_FREE_PTR(body);
865         RETURN(rc);
866 }
867
868 static int osc_should_shrink_grant(struct client_obd *client)
869 {
870         cfs_time_t time = cfs_time_current();
871         cfs_time_t next_shrink = client->cl_next_shrink_grant;
872
873         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
874              OBD_CONNECT_GRANT_SHRINK) == 0)
875                 return 0;
876
877         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
878                 /* Get the current RPC size directly, instead of going via:
879                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
880                  * Keep comment here so that it can be found by searching. */
881                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
882
883                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
884                     client->cl_avail_grant > brw_size)
885                         return 1;
886                 else
887                         osc_update_next_shrink(client);
888         }
889         return 0;
890 }
891
892 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
893 {
894         struct client_obd *client;
895
896         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
897                 if (osc_should_shrink_grant(client))
898                         osc_shrink_grant(client);
899         }
900         return 0;
901 }
902
903 static int osc_add_shrink_grant(struct client_obd *client)
904 {
905         int rc;
906
907         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
908                                        TIMEOUT_GRANT,
909                                        osc_grant_shrink_grant_cb, NULL,
910                                        &client->cl_grant_shrink_list);
911         if (rc) {
912                 CERROR("add grant client %s error %d\n",
913                         client->cl_import->imp_obd->obd_name, rc);
914                 return rc;
915         }
916         CDEBUG(D_CACHE, "add grant client %s \n",
917                client->cl_import->imp_obd->obd_name);
918         osc_update_next_shrink(client);
919         return 0;
920 }
921
922 static int osc_del_shrink_grant(struct client_obd *client)
923 {
924         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
925                                          TIMEOUT_GRANT);
926 }
927
928 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
929 {
930         /*
931          * ocd_grant is the total grant amount we're expect to hold: if we've
932          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
933          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
934          * dirty.
935          *
936          * race is tolerable here: if we're evicted, but imp_state already
937          * left EVICTED state, then cl_dirty_pages must be 0 already.
938          */
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
941                 cli->cl_avail_grant = ocd->ocd_grant;
942         else
943                 cli->cl_avail_grant = ocd->ocd_grant -
944                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
945
946         if (cli->cl_avail_grant < 0) {
947                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
948                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
949                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
950                 /* workaround for servers which do not have the patch from
951                  * LU-2679 */
952                 cli->cl_avail_grant = ocd->ocd_grant;
953         }
954
955         /* determine the appropriate chunk size used by osc_extent. */
956         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958
959         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
960                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
961                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
962
963         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
964             list_empty(&cli->cl_grant_shrink_list))
965                 osc_add_shrink_grant(cli);
966 }
967
968 /* We assume that the reason this OSC got a short read is because it read
969  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
970  * via the LOV, and it _knows_ it's reading inside the file, it's just that
971  * this stripe never got written at or beyond this stripe offset yet. */
972 static void handle_short_read(int nob_read, obd_count page_count,
973                               struct brw_page **pga)
974 {
975         char *ptr;
976         int i = 0;
977
978         /* skip bytes read OK */
979         while (nob_read > 0) {
980                 LASSERT (page_count > 0);
981
982                 if (pga[i]->count > nob_read) {
983                         /* EOF inside this page */
984                         ptr = kmap(pga[i]->pg) +
985                                 (pga[i]->off & ~CFS_PAGE_MASK);
986                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
987                         kunmap(pga[i]->pg);
988                         page_count--;
989                         i++;
990                         break;
991                 }
992
993                 nob_read -= pga[i]->count;
994                 page_count--;
995                 i++;
996         }
997
998         /* zero remaining pages */
999         while (page_count-- > 0) {
1000                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1001                 memset(ptr, 0, pga[i]->count);
1002                 kunmap(pga[i]->pg);
1003                 i++;
1004         }
1005 }
1006
1007 static int check_write_rcs(struct ptlrpc_request *req,
1008                            int requested_nob, int niocount,
1009                            obd_count page_count, struct brw_page **pga)
1010 {
1011         int     i;
1012         __u32   *remote_rcs;
1013
1014         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1015                                                   sizeof(*remote_rcs) *
1016                                                   niocount);
1017         if (remote_rcs == NULL) {
1018                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1019                 return(-EPROTO);
1020         }
1021
1022         /* return error if any niobuf was in error */
1023         for (i = 0; i < niocount; i++) {
1024                 if ((int)remote_rcs[i] < 0)
1025                         return(remote_rcs[i]);
1026
1027                 if (remote_rcs[i] != 0) {
1028                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1029                                 i, remote_rcs[i], req);
1030                         return(-EPROTO);
1031                 }
1032         }
1033
1034         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1035                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1036                        req->rq_bulk->bd_nob_transferred, requested_nob);
1037                 return(-EPROTO);
1038         }
1039
1040         return (0);
1041 }
1042
1043 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1044 {
1045         if (p1->flag != p2->flag) {
1046                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1047                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1048                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1049
1050                 /* warn if we try to combine flags that we don't know to be
1051                  * safe to combine */
1052                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1053                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1054                               "report this at https://jira.hpdd.intel.com/\n",
1055                               p1->flag, p2->flag);
1056                 }
1057                 return 0;
1058         }
1059
1060         return (p1->off + p1->count == p2->off);
1061 }
1062
1063 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1064                                    struct brw_page **pga, int opc,
1065                                    cksum_type_t cksum_type)
1066 {
1067         __u32                           cksum;
1068         int                             i = 0;
1069         struct cfs_crypto_hash_desc     *hdesc;
1070         unsigned int                    bufsize;
1071         int                             err;
1072         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1073
1074         LASSERT(pg_count > 0);
1075
1076         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1077         if (IS_ERR(hdesc)) {
1078                 CERROR("Unable to initialize checksum hash %s\n",
1079                        cfs_crypto_hash_name(cfs_alg));
1080                 return PTR_ERR(hdesc);
1081         }
1082
1083         while (nob > 0 && pg_count > 0) {
1084                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1085
1086                 /* corrupt the data before we compute the checksum, to
1087                  * simulate an OST->client data error */
1088                 if (i == 0 && opc == OST_READ &&
1089                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1090                         unsigned char *ptr = kmap(pga[i]->pg);
1091                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1092
1093                         memcpy(ptr + off, "bad1", min(4, nob));
1094                         kunmap(pga[i]->pg);
1095                 }
1096                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1097                                             pga[i]->off & ~CFS_PAGE_MASK,
1098                                             count);
1099                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1100                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1101
1102                 nob -= pga[i]->count;
1103                 pg_count--;
1104                 i++;
1105         }
1106
1107         bufsize = sizeof(cksum);
1108         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1109
1110         /* For sending we only compute the wrong checksum instead
1111          * of corrupting the data so it is still correct on a redo */
1112         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1113                 cksum++;
1114
1115         return cksum;
1116 }
1117
1118 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1119                                 struct lov_stripe_md *lsm, obd_count page_count,
1120                                 struct brw_page **pga,
1121                                 struct ptlrpc_request **reqp,
1122                                 struct obd_capa *ocapa, int reserve,
1123                                 int resend)
1124 {
1125         struct ptlrpc_request   *req;
1126         struct ptlrpc_bulk_desc *desc;
1127         struct ost_body         *body;
1128         struct obd_ioobj        *ioobj;
1129         struct niobuf_remote    *niobuf;
1130         int niocount, i, requested_nob, opc, rc;
1131         struct osc_brw_async_args *aa;
1132         struct req_capsule      *pill;
1133         struct brw_page *pg_prev;
1134
1135         ENTRY;
1136         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1137                 RETURN(-ENOMEM); /* Recoverable */
1138         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1139                 RETURN(-EINVAL); /* Fatal */
1140
1141         if ((cmd & OBD_BRW_WRITE) != 0) {
1142                 opc = OST_WRITE;
1143                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1144                                                 cli->cl_import->imp_rq_pool,
1145                                                 &RQF_OST_BRW_WRITE);
1146         } else {
1147                 opc = OST_READ;
1148                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1149         }
1150         if (req == NULL)
1151                 RETURN(-ENOMEM);
1152
1153         for (niocount = i = 1; i < page_count; i++) {
1154                 if (!can_merge_pages(pga[i - 1], pga[i]))
1155                         niocount++;
1156         }
1157
1158         pill = &req->rq_pill;
1159         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1160                              sizeof(*ioobj));
1161         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1162                              niocount * sizeof(*niobuf));
1163         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1164
1165         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1166         if (rc) {
1167                 ptlrpc_request_free(req);
1168                 RETURN(rc);
1169         }
1170         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1171         ptlrpc_at_set_req_timeout(req);
1172         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1173          * retry logic */
1174         req->rq_no_retry_einprogress = 1;
1175
1176         desc = ptlrpc_prep_bulk_imp(req, page_count,
1177                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1178                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1179                 OST_BULK_PORTAL);
1180
1181         if (desc == NULL)
1182                 GOTO(out, rc = -ENOMEM);
1183         /* NB request now owns desc and will free it when it gets freed */
1184
1185         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1186         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1187         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1188         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1189
1190         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1191
1192         obdo_to_ioobj(oa, ioobj);
1193         ioobj->ioo_bufcnt = niocount;
1194         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1195          * that might be send for this request.  The actual number is decided
1196          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1197          * "max - 1" for old client compatibility sending "0", and also so the
1198          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1199         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1200         osc_pack_capa(req, body, ocapa);
1201         LASSERT(page_count > 0);
1202         pg_prev = pga[0];
1203         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204                 struct brw_page *pg = pga[i];
1205                 int poff = pg->off & ~CFS_PAGE_MASK;
1206
1207                 LASSERT(pg->count > 0);
1208                 /* make sure there is no gap in the middle of page array */
1209                 LASSERTF(page_count == 1 ||
1210                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211                           ergo(i > 0 && i < page_count - 1,
1212                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1213                           ergo(i == page_count - 1, poff == 0)),
1214                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215                          i, page_count, pg, pg->off, pg->count);
1216                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1219                          i, page_count,
1220                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221                          pg_prev->pg, page_private(pg_prev->pg),
1222                          pg_prev->pg->index, pg_prev->off);
1223                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224                         (pg->flag & OBD_BRW_SRVLOCK));
1225
1226                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1227                 requested_nob += pg->count;
1228
1229                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1230                         niobuf--;
1231                         niobuf->rnb_len += pg->count;
1232                 } else {
1233                         niobuf->rnb_offset = pg->off;
1234                         niobuf->rnb_len    = pg->count;
1235                         niobuf->rnb_flags  = pg->flag;
1236                 }
1237                 pg_prev = pg;
1238         }
1239
1240         LASSERTF((void *)(niobuf - niocount) ==
1241                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1244
1245         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1246         if (resend) {
1247                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1249                         body->oa.o_flags = 0;
1250                 }
1251                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1252         }
1253
1254         if (osc_should_shrink_grant(cli))
1255                 osc_shrink_grant_local(cli, &body->oa);
1256
1257         /* size[REQ_REC_OFF] still sizeof (*body) */
1258         if (opc == OST_WRITE) {
1259                 if (cli->cl_checksum &&
1260                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261                         /* store cl_cksum_type in a local variable since
1262                          * it can be changed via lprocfs */
1263                         cksum_type_t cksum_type = cli->cl_cksum_type;
1264
1265                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267                                 body->oa.o_flags = 0;
1268                         }
1269                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1270                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1272                                                              page_count, pga,
1273                                                              OST_WRITE,
1274                                                              cksum_type);
1275                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1276                                body->oa.o_cksum);
1277                         /* save this in 'oa', too, for later checking */
1278                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279                         oa->o_flags |= cksum_type_pack(cksum_type);
1280                 } else {
1281                         /* clear out the checksum flag, in case this is a
1282                          * resend but cl_checksum is no longer set. b=11238 */
1283                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1284                 }
1285                 oa->o_cksum = body->oa.o_cksum;
1286                 /* 1 RC per niobuf */
1287                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288                                      sizeof(__u32) * niocount);
1289         } else {
1290                 if (cli->cl_checksum &&
1291                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293                                 body->oa.o_flags = 0;
1294                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1296                 }
1297         }
1298         ptlrpc_request_set_replen(req);
1299
1300         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301         aa = ptlrpc_req_async_args(req);
1302         aa->aa_oa = oa;
1303         aa->aa_requested_nob = requested_nob;
1304         aa->aa_nio_count = niocount;
1305         aa->aa_page_count = page_count;
1306         aa->aa_resends = 0;
1307         aa->aa_ppga = pga;
1308         aa->aa_cli = cli;
1309         INIT_LIST_HEAD(&aa->aa_oaps);
1310         if (ocapa && reserve)
1311                 aa->aa_ocapa = capa_get(ocapa);
1312
1313         *reqp = req;
1314         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1315         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1316                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1317                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1318         RETURN(0);
1319
1320  out:
1321         ptlrpc_req_finished(req);
1322         RETURN(rc);
1323 }
1324
1325 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1326                                 __u32 client_cksum, __u32 server_cksum, int nob,
1327                                 obd_count page_count, struct brw_page **pga,
1328                                 cksum_type_t client_cksum_type)
1329 {
1330         __u32 new_cksum;
1331         char *msg;
1332         cksum_type_t cksum_type;
1333
1334         if (server_cksum == client_cksum) {
1335                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1336                 return 0;
1337         }
1338
1339         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1340                                        oa->o_flags : 0);
1341         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1342                                       cksum_type);
1343
1344         if (cksum_type != client_cksum_type)
1345                 msg = "the server did not use the checksum type specified in "
1346                       "the original request - likely a protocol problem";
1347         else if (new_cksum == server_cksum)
1348                 msg = "changed on the client after we checksummed it - "
1349                       "likely false positive due to mmap IO (bug 11742)";
1350         else if (new_cksum == client_cksum)
1351                 msg = "changed in transit before arrival at OST";
1352         else
1353                 msg = "changed in transit AND doesn't match the original - "
1354                       "likely false positive due to mmap IO (bug 11742)";
1355
1356         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1357                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1358                            msg, libcfs_nid2str(peer->nid),
1359                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1360                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1361                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1362                            POSTID(&oa->o_oi), pga[0]->off,
1363                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1364         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1365                "client csum now %x\n", client_cksum, client_cksum_type,
1366                server_cksum, cksum_type, new_cksum);
1367         return 1;
1368 }
1369
1370 /* Note rc enters this function as number of bytes transferred */
1371 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1372 {
1373         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1374         const lnet_process_id_t *peer =
1375                         &req->rq_import->imp_connection->c_peer;
1376         struct client_obd *cli = aa->aa_cli;
1377         struct ost_body *body;
1378         __u32 client_cksum = 0;
1379         ENTRY;
1380
1381         if (rc < 0 && rc != -EDQUOT) {
1382                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1383                 RETURN(rc);
1384         }
1385
1386         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1387         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1388         if (body == NULL) {
1389                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1390                 RETURN(-EPROTO);
1391         }
1392
1393         /* set/clear over quota flag for a uid/gid */
1394         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1395             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1396                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1397
1398                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1399                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1400                        body->oa.o_flags);
1401                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1402         }
1403
1404         osc_update_grant(cli, body);
1405
1406         if (rc < 0)
1407                 RETURN(rc);
1408
1409         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1410                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1411
1412         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1413                 if (rc > 0) {
1414                         CERROR("Unexpected +ve rc %d\n", rc);
1415                         RETURN(-EPROTO);
1416                 }
1417                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1418
1419                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1420                         RETURN(-EAGAIN);
1421
1422                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1423                     check_write_checksum(&body->oa, peer, client_cksum,
1424                                          body->oa.o_cksum, aa->aa_requested_nob,
1425                                          aa->aa_page_count, aa->aa_ppga,
1426                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1427                         RETURN(-EAGAIN);
1428
1429                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1430                                      aa->aa_page_count, aa->aa_ppga);
1431                 GOTO(out, rc);
1432         }
1433
1434         /* The rest of this function executes only for OST_READs */
1435
1436         /* if unwrap_bulk failed, return -EAGAIN to retry */
1437         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1438         if (rc < 0)
1439                 GOTO(out, rc = -EAGAIN);
1440
1441         if (rc > aa->aa_requested_nob) {
1442                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1443                        aa->aa_requested_nob);
1444                 RETURN(-EPROTO);
1445         }
1446
1447         if (rc != req->rq_bulk->bd_nob_transferred) {
1448                 CERROR ("Unexpected rc %d (%d transferred)\n",
1449                         rc, req->rq_bulk->bd_nob_transferred);
1450                 return (-EPROTO);
1451         }
1452
1453         if (rc < aa->aa_requested_nob)
1454                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1455
1456         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1457                 static int cksum_counter;
1458                 __u32      server_cksum = body->oa.o_cksum;
1459                 char      *via;
1460                 char      *router;
1461                 cksum_type_t cksum_type;
1462
1463                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1464                                                body->oa.o_flags : 0);
1465                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1466                                                  aa->aa_ppga, OST_READ,
1467                                                  cksum_type);
1468
1469                 if (peer->nid == req->rq_bulk->bd_sender) {
1470                         via = router = "";
1471                 } else {
1472                         via = " via ";
1473                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1474                 }
1475
1476                 if (server_cksum != client_cksum) {
1477                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1478                                            "%s%s%s inode "DFID" object "DOSTID
1479                                            " extent ["LPU64"-"LPU64"]\n",
1480                                            req->rq_import->imp_obd->obd_name,
1481                                            libcfs_nid2str(peer->nid),
1482                                            via, router,
1483                                            body->oa.o_valid & OBD_MD_FLFID ?
1484                                                 body->oa.o_parent_seq : (__u64)0,
1485                                            body->oa.o_valid & OBD_MD_FLFID ?
1486                                                 body->oa.o_parent_oid : 0,
1487                                            body->oa.o_valid & OBD_MD_FLFID ?
1488                                                 body->oa.o_parent_ver : 0,
1489                                            POSTID(&body->oa.o_oi),
1490                                            aa->aa_ppga[0]->off,
1491                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1492                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1493                                                                         1);
1494                         CERROR("client %x, server %x, cksum_type %x\n",
1495                                client_cksum, server_cksum, cksum_type);
1496                         cksum_counter = 0;
1497                         aa->aa_oa->o_cksum = client_cksum;
1498                         rc = -EAGAIN;
1499                 } else {
1500                         cksum_counter++;
1501                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1502                         rc = 0;
1503                 }
1504         } else if (unlikely(client_cksum)) {
1505                 static int cksum_missed;
1506
1507                 cksum_missed++;
1508                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1509                         CERROR("Checksum %u requested from %s but not sent\n",
1510                                cksum_missed, libcfs_nid2str(peer->nid));
1511         } else {
1512                 rc = 0;
1513         }
1514 out:
1515         if (rc >= 0)
1516                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1517                                      aa->aa_oa, &body->oa);
1518
1519         RETURN(rc);
1520 }
1521
1522 static int osc_brw_redo_request(struct ptlrpc_request *request,
1523                                 struct osc_brw_async_args *aa, int rc)
1524 {
1525         struct ptlrpc_request *new_req;
1526         struct osc_brw_async_args *new_aa;
1527         struct osc_async_page *oap;
1528         ENTRY;
1529
1530         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1531                   "redo for recoverable error %d", rc);
1532
1533         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1534                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1535                                   aa->aa_cli, aa->aa_oa,
1536                                   NULL /* lsm unused by osc currently */,
1537                                   aa->aa_page_count, aa->aa_ppga,
1538                                   &new_req, aa->aa_ocapa, 0, 1);
1539         if (rc)
1540                 RETURN(rc);
1541
1542         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1543                 if (oap->oap_request != NULL) {
1544                         LASSERTF(request == oap->oap_request,
1545                                  "request %p != oap_request %p\n",
1546                                  request, oap->oap_request);
1547                         if (oap->oap_interrupted) {
1548                                 ptlrpc_req_finished(new_req);
1549                                 RETURN(-EINTR);
1550                         }
1551                 }
1552         }
1553         /* New request takes over pga and oaps from old request.
1554          * Note that copying a list_head doesn't work, need to move it... */
1555         aa->aa_resends++;
1556         new_req->rq_interpret_reply = request->rq_interpret_reply;
1557         new_req->rq_async_args = request->rq_async_args;
1558         new_req->rq_commit_cb = request->rq_commit_cb;
1559         /* cap resend delay to the current request timeout, this is similar to
1560          * what ptlrpc does (see after_reply()) */
1561         if (aa->aa_resends > new_req->rq_timeout)
1562                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1563         else
1564                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1565         new_req->rq_generation_set = 1;
1566         new_req->rq_import_generation = request->rq_import_generation;
1567
1568         new_aa = ptlrpc_req_async_args(new_req);
1569
1570         INIT_LIST_HEAD(&new_aa->aa_oaps);
1571         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1572         INIT_LIST_HEAD(&new_aa->aa_exts);
1573         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1574         new_aa->aa_resends = aa->aa_resends;
1575
1576         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1577                 if (oap->oap_request) {
1578                         ptlrpc_req_finished(oap->oap_request);
1579                         oap->oap_request = ptlrpc_request_addref(new_req);
1580                 }
1581         }
1582
1583         new_aa->aa_ocapa = aa->aa_ocapa;
1584         aa->aa_ocapa = NULL;
1585
1586         /* XXX: This code will run into problem if we're going to support
1587          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1588          * and wait for all of them to be finished. We should inherit request
1589          * set from old request. */
1590         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1591
1592         DEBUG_REQ(D_INFO, new_req, "new request");
1593         RETURN(0);
1594 }
1595
1596 /*
1597  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1598  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1599  * fine for our small page arrays and doesn't require allocation.  its an
1600  * insertion sort that swaps elements that are strides apart, shrinking the
1601  * stride down until its '1' and the array is sorted.
1602  */
1603 static void sort_brw_pages(struct brw_page **array, int num)
1604 {
1605         int stride, i, j;
1606         struct brw_page *tmp;
1607
1608         if (num == 1)
1609                 return;
1610         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1611                 ;
1612
1613         do {
1614                 stride /= 3;
1615                 for (i = stride ; i < num ; i++) {
1616                         tmp = array[i];
1617                         j = i;
1618                         while (j >= stride && array[j - stride]->off > tmp->off) {
1619                                 array[j] = array[j - stride];
1620                                 j -= stride;
1621                         }
1622                         array[j] = tmp;
1623                 }
1624         } while (stride > 1);
1625 }
1626
1627 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1628 {
1629         LASSERT(ppga != NULL);
1630         OBD_FREE(ppga, sizeof(*ppga) * count);
1631 }
1632
1633 static int brw_interpret(const struct lu_env *env,
1634                          struct ptlrpc_request *req, void *data, int rc)
1635 {
1636         struct osc_brw_async_args *aa = data;
1637         struct osc_extent *ext;
1638         struct osc_extent *tmp;
1639         struct client_obd *cli = aa->aa_cli;
1640         ENTRY;
1641
1642         rc = osc_brw_fini_request(req, rc);
1643         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1644         /* When server return -EINPROGRESS, client should always retry
1645          * regardless of the number of times the bulk was resent already. */
1646         if (osc_recoverable_error(rc)) {
1647                 if (req->rq_import_generation !=
1648                     req->rq_import->imp_generation) {
1649                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1650                                ""DOSTID", rc = %d.\n",
1651                                req->rq_import->imp_obd->obd_name,
1652                                POSTID(&aa->aa_oa->o_oi), rc);
1653                 } else if (rc == -EINPROGRESS ||
1654                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1655                         rc = osc_brw_redo_request(req, aa, rc);
1656                 } else {
1657                         CERROR("%s: too many resent retries for object: "
1658                                ""LPU64":"LPU64", rc = %d.\n",
1659                                req->rq_import->imp_obd->obd_name,
1660                                POSTID(&aa->aa_oa->o_oi), rc);
1661                 }
1662
1663                 if (rc == 0)
1664                         RETURN(0);
1665                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1666                         rc = -EIO;
1667         }
1668
1669         if (aa->aa_ocapa) {
1670                 capa_put(aa->aa_ocapa);
1671                 aa->aa_ocapa = NULL;
1672         }
1673
1674         if (rc == 0) {
1675                 struct obdo *oa = aa->aa_oa;
1676                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1677                 unsigned long valid = 0;
1678                 struct cl_object *obj;
1679                 struct osc_async_page *last;
1680
1681                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1682                 obj = osc2cl(last->oap_obj);
1683
1684                 cl_object_attr_lock(obj);
1685                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1686                         attr->cat_blocks = oa->o_blocks;
1687                         valid |= CAT_BLOCKS;
1688                 }
1689                 if (oa->o_valid & OBD_MD_FLMTIME) {
1690                         attr->cat_mtime = oa->o_mtime;
1691                         valid |= CAT_MTIME;
1692                 }
1693                 if (oa->o_valid & OBD_MD_FLATIME) {
1694                         attr->cat_atime = oa->o_atime;
1695                         valid |= CAT_ATIME;
1696                 }
1697                 if (oa->o_valid & OBD_MD_FLCTIME) {
1698                         attr->cat_ctime = oa->o_ctime;
1699                         valid |= CAT_CTIME;
1700                 }
1701
1702                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1703                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1704                         loff_t last_off = last->oap_count + last->oap_obj_off +
1705                                 last->oap_page_off;
1706
1707                         /* Change file size if this is an out of quota or
1708                          * direct IO write and it extends the file size */
1709                         if (loi->loi_lvb.lvb_size < last_off) {
1710                                 attr->cat_size = last_off;
1711                                 valid |= CAT_SIZE;
1712                         }
1713                         /* Extend KMS if it's not a lockless write */
1714                         if (loi->loi_kms < last_off &&
1715                             oap2osc_page(last)->ops_srvlock == 0) {
1716                                 attr->cat_kms = last_off;
1717                                 valid |= CAT_KMS;
1718                         }
1719                 }
1720
1721                 if (valid != 0)
1722                         cl_object_attr_set(env, obj, attr, valid);
1723                 cl_object_attr_unlock(obj);
1724         }
1725         OBDO_FREE(aa->aa_oa);
1726
1727         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1728                 osc_inc_unstable_pages(req);
1729
1730         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1731                 list_del_init(&ext->oe_link);
1732                 osc_extent_finish(env, ext, 1, rc);
1733         }
1734         LASSERT(list_empty(&aa->aa_exts));
1735         LASSERT(list_empty(&aa->aa_oaps));
1736
1737         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1738                           req->rq_bulk->bd_nob_transferred);
1739         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1740         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1741
1742         client_obd_list_lock(&cli->cl_loi_list_lock);
1743         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1744          * is called so we know whether to go to sync BRWs or wait for more
1745          * RPCs to complete */
1746         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1747                 cli->cl_w_in_flight--;
1748         else
1749                 cli->cl_r_in_flight--;
1750         osc_wake_cache_waiters(cli);
1751         client_obd_list_unlock(&cli->cl_loi_list_lock);
1752
1753         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1754         RETURN(rc);
1755 }
1756
1757 static void brw_commit(struct ptlrpc_request *req)
1758 {
1759         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1760          * this called via the rq_commit_cb, I need to ensure
1761          * osc_dec_unstable_pages is still called. Otherwise unstable
1762          * pages may be leaked. */
1763         spin_lock(&req->rq_lock);
1764         if (likely(req->rq_unstable)) {
1765                 req->rq_unstable = 0;
1766                 spin_unlock(&req->rq_lock);
1767
1768                 osc_dec_unstable_pages(req);
1769         } else {
1770                 req->rq_committed = 1;
1771                 spin_unlock(&req->rq_lock);
1772         }
1773 }
1774
1775 /**
1776  * Build an RPC by the list of extent @ext_list. The caller must ensure
1777  * that the total pages in this list are NOT over max pages per RPC.
1778  * Extents in the list must be in OES_RPC state.
1779  */
1780 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1781                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1782 {
1783         struct ptlrpc_request           *req = NULL;
1784         struct osc_extent               *ext;
1785         struct brw_page                 **pga = NULL;
1786         struct osc_brw_async_args       *aa = NULL;
1787         struct obdo                     *oa = NULL;
1788         struct osc_async_page           *oap;
1789         struct osc_async_page           *tmp;
1790         struct cl_req                   *clerq = NULL;
1791         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1792                                                                       CRT_READ;
1793         struct cl_req_attr              *crattr = NULL;
1794         obd_off                         starting_offset = OBD_OBJECT_EOF;
1795         obd_off                         ending_offset = 0;
1796         int                             mpflag = 0;
1797         int                             mem_tight = 0;
1798         int                             page_count = 0;
1799         bool                            soft_sync = false;
1800         int                             i;
1801         int                             rc;
1802         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1803
1804         ENTRY;
1805         LASSERT(!list_empty(ext_list));
1806
1807         /* add pages into rpc_list to build BRW rpc */
1808         list_for_each_entry(ext, ext_list, oe_link) {
1809                 LASSERT(ext->oe_state == OES_RPC);
1810                 mem_tight |= ext->oe_memalloc;
1811                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1812                         ++page_count;
1813                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1814                         if (starting_offset > oap->oap_obj_off)
1815                                 starting_offset = oap->oap_obj_off;
1816                         else
1817                                 LASSERT(oap->oap_page_off == 0);
1818                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1819                                 ending_offset = oap->oap_obj_off +
1820                                                 oap->oap_count;
1821                         else
1822                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1823                                         PAGE_CACHE_SIZE);
1824                 }
1825         }
1826
1827         soft_sync = osc_over_unstable_soft_limit(cli);
1828         if (mem_tight)
1829                 mpflag = cfs_memory_pressure_get_and_set();
1830
1831         OBD_ALLOC(crattr, sizeof(*crattr));
1832         if (crattr == NULL)
1833                 GOTO(out, rc = -ENOMEM);
1834
1835         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1836         if (pga == NULL)
1837                 GOTO(out, rc = -ENOMEM);
1838
1839         OBDO_ALLOC(oa);
1840         if (oa == NULL)
1841                 GOTO(out, rc = -ENOMEM);
1842
1843         i = 0;
1844         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1845                 struct cl_page *page = oap2cl_page(oap);
1846                 if (clerq == NULL) {
1847                         clerq = cl_req_alloc(env, page, crt,
1848                                              1 /* only 1-object rpcs for now */);
1849                         if (IS_ERR(clerq))
1850                                 GOTO(out, rc = PTR_ERR(clerq));
1851                 }
1852                 if (mem_tight)
1853                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1854                 if (soft_sync)
1855                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1856                 pga[i] = &oap->oap_brw_page;
1857                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1858                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1859                        pga[i]->pg, page_index(oap->oap_page), oap,
1860                        pga[i]->flag);
1861                 i++;
1862                 cl_req_page_add(env, clerq, page);
1863         }
1864
1865         /* always get the data for the obdo for the rpc */
1866         LASSERT(clerq != NULL);
1867         crattr->cra_oa = oa;
1868         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1869
1870         rc = cl_req_prep(env, clerq);
1871         if (rc != 0) {
1872                 CERROR("cl_req_prep failed: %d\n", rc);
1873                 GOTO(out, rc);
1874         }
1875
1876         sort_brw_pages(pga, page_count);
1877         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1878                         pga, &req, crattr->cra_capa, 1, 0);
1879         if (rc != 0) {
1880                 CERROR("prep_req failed: %d\n", rc);
1881                 GOTO(out, rc);
1882         }
1883
1884         req->rq_commit_cb = brw_commit;
1885         req->rq_interpret_reply = brw_interpret;
1886
1887         if (mem_tight != 0)
1888                 req->rq_memalloc = 1;
1889
1890         /* Need to update the timestamps after the request is built in case
1891          * we race with setattr (locally or in queue at OST).  If OST gets
1892          * later setattr before earlier BRW (as determined by the request xid),
1893          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1894          * way to do this in a single call.  bug 10150 */
1895         cl_req_attr_set(env, clerq, crattr,
1896                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1897
1898         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1899
1900         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1901         aa = ptlrpc_req_async_args(req);
1902         INIT_LIST_HEAD(&aa->aa_oaps);
1903         list_splice_init(&rpc_list, &aa->aa_oaps);
1904         INIT_LIST_HEAD(&aa->aa_exts);
1905         list_splice_init(ext_list, &aa->aa_exts);
1906         aa->aa_clerq = clerq;
1907
1908         /* queued sync pages can be torn down while the pages
1909          * were between the pending list and the rpc */
1910         tmp = NULL;
1911         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1912                 /* only one oap gets a request reference */
1913                 if (tmp == NULL)
1914                         tmp = oap;
1915                 if (oap->oap_interrupted && !req->rq_intr) {
1916                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1917                                         oap, req);
1918                         ptlrpc_mark_interrupted(req);
1919                 }
1920         }
1921         if (tmp != NULL)
1922                 tmp->oap_request = ptlrpc_request_addref(req);
1923
1924         client_obd_list_lock(&cli->cl_loi_list_lock);
1925         starting_offset >>= PAGE_CACHE_SHIFT;
1926         if (cmd == OBD_BRW_READ) {
1927                 cli->cl_r_in_flight++;
1928                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1929                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1930                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1931                                       starting_offset + 1);
1932         } else {
1933                 cli->cl_w_in_flight++;
1934                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1935                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1936                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1937                                       starting_offset + 1);
1938         }
1939         client_obd_list_unlock(&cli->cl_loi_list_lock);
1940
1941         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1942                   page_count, aa, cli->cl_r_in_flight,
1943                   cli->cl_w_in_flight);
1944
1945         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1946          * see which CPU/NUMA node the majority of pages were allocated
1947          * on, and try to assign the async RPC to the CPU core
1948          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1949          *
1950          * But on the other hand, we expect that multiple ptlrpcd
1951          * threads and the initial write sponsor can run in parallel,
1952          * especially when data checksum is enabled, which is CPU-bound
1953          * operation and single ptlrpcd thread cannot process in time.
1954          * So more ptlrpcd threads sharing BRW load
1955          * (with PDL_POLICY_ROUND) seems better.
1956          */
1957         ptlrpcd_add_req(req, pol, -1);
1958         rc = 0;
1959         EXIT;
1960
1961 out:
1962         if (mem_tight != 0)
1963                 cfs_memory_pressure_restore(mpflag);
1964
1965         if (crattr != NULL) {
1966                 capa_put(crattr->cra_capa);
1967                 OBD_FREE(crattr, sizeof(*crattr));
1968         }
1969
1970         if (rc != 0) {
1971                 LASSERT(req == NULL);
1972
1973                 if (oa)
1974                         OBDO_FREE(oa);
1975                 if (pga)
1976                         OBD_FREE(pga, sizeof(*pga) * page_count);
1977                 /* this should happen rarely and is pretty bad, it makes the
1978                  * pending list not follow the dirty order */
1979                 while (!list_empty(ext_list)) {
1980                         ext = list_entry(ext_list->next, struct osc_extent,
1981                                          oe_link);
1982                         list_del_init(&ext->oe_link);
1983                         osc_extent_finish(env, ext, 0, rc);
1984                 }
1985                 if (clerq && !IS_ERR(clerq))
1986                         cl_req_completion(env, clerq, rc);
1987         }
1988         RETURN(rc);
1989 }
1990
1991 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1992                                         struct ldlm_enqueue_info *einfo)
1993 {
1994         void *data = einfo->ei_cbdata;
1995         int set = 0;
1996
1997         LASSERT(lock != NULL);
1998         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1999         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2000         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2001         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2002
2003         lock_res_and_lock(lock);
2004
2005         if (lock->l_ast_data == NULL)
2006                 lock->l_ast_data = data;
2007         if (lock->l_ast_data == data)
2008                 set = 1;
2009
2010         unlock_res_and_lock(lock);
2011
2012         return set;
2013 }
2014
2015 static int osc_set_data_with_check(struct lustre_handle *lockh,
2016                                    struct ldlm_enqueue_info *einfo)
2017 {
2018         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2019         int set = 0;
2020
2021         if (lock != NULL) {
2022                 set = osc_set_lock_data_with_check(lock, einfo);
2023                 LDLM_LOCK_PUT(lock);
2024         } else
2025                 CERROR("lockh %p, data %p - client evicted?\n",
2026                        lockh, einfo->ei_cbdata);
2027         return set;
2028 }
2029
2030 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2031                              ldlm_iterator_t replace, void *data)
2032 {
2033         struct ldlm_res_id res_id;
2034         struct obd_device *obd = class_exp2obd(exp);
2035
2036         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2037         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2038         return 0;
2039 }
2040
2041 /* find any ldlm lock of the inode in osc
2042  * return 0    not find
2043  *        1    find one
2044  *      < 0    error */
2045 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2046                            ldlm_iterator_t replace, void *data)
2047 {
2048         struct ldlm_res_id res_id;
2049         struct obd_device *obd = class_exp2obd(exp);
2050         int rc = 0;
2051
2052         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2053         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2054         if (rc == LDLM_ITER_STOP)
2055                 return(1);
2056         if (rc == LDLM_ITER_CONTINUE)
2057                 return(0);
2058         return(rc);
2059 }
2060
2061 static int osc_enqueue_fini(struct ptlrpc_request *req,
2062                             osc_enqueue_upcall_f upcall, void *cookie,
2063                             struct lustre_handle *lockh, ldlm_mode_t mode,
2064                             __u64 *flags, int agl, int errcode)
2065 {
2066         bool intent = *flags & LDLM_FL_HAS_INTENT;
2067         int rc;
2068         ENTRY;
2069
2070         /* The request was created before ldlm_cli_enqueue call. */
2071         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2072                 struct ldlm_reply *rep;
2073
2074                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2075                 LASSERT(rep != NULL);
2076
2077                 rep->lock_policy_res1 =
2078                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2079                 if (rep->lock_policy_res1)
2080                         errcode = rep->lock_policy_res1;
2081                 if (!agl)
2082                         *flags |= LDLM_FL_LVB_READY;
2083         } else if (errcode == ELDLM_OK) {
2084                 *flags |= LDLM_FL_LVB_READY;
2085         }
2086
2087         /* Call the update callback. */
2088         rc = (*upcall)(cookie, lockh, errcode);
2089
2090         /* release the reference taken in ldlm_cli_enqueue() */
2091         if (errcode == ELDLM_LOCK_MATCHED)
2092                 errcode = ELDLM_OK;
2093         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2094                 ldlm_lock_decref(lockh, mode);
2095
2096         RETURN(rc);
2097 }
2098
2099 static int osc_enqueue_interpret(const struct lu_env *env,
2100                                  struct ptlrpc_request *req,
2101                                  struct osc_enqueue_args *aa, int rc)
2102 {
2103         struct ldlm_lock *lock;
2104         struct lustre_handle *lockh = &aa->oa_lockh;
2105         ldlm_mode_t mode = aa->oa_mode;
2106         struct ost_lvb *lvb = aa->oa_lvb;
2107         __u32 lvb_len = sizeof(*lvb);
2108         __u64 flags = 0;
2109
2110         ENTRY;
2111
2112         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2113          * be valid. */
2114         lock = ldlm_handle2lock(lockh);
2115         LASSERTF(lock != NULL,
2116                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2117                  lockh->cookie, req, aa);
2118
2119         /* Take an additional reference so that a blocking AST that
2120          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2121          * to arrive after an upcall has been executed by
2122          * osc_enqueue_fini(). */
2123         ldlm_lock_addref(lockh, mode);
2124
2125         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2126         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2127
2128         /* Let CP AST to grant the lock first. */
2129         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2130
2131         if (aa->oa_agl) {
2132                 LASSERT(aa->oa_lvb == NULL);
2133                 LASSERT(aa->oa_flags == NULL);
2134                 aa->oa_flags = &flags;
2135         }
2136
2137         /* Complete obtaining the lock procedure. */
2138         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2139                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2140                                    lockh, rc);
2141         /* Complete osc stuff. */
2142         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2143                               aa->oa_flags, aa->oa_agl, rc);
2144
2145         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2146
2147         ldlm_lock_decref(lockh, mode);
2148         LDLM_LOCK_PUT(lock);
2149         RETURN(rc);
2150 }
2151
2152 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2153
2154 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2155  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2156  * other synchronous requests, however keeping some locks and trying to obtain
2157  * others may take a considerable amount of time in a case of ost failure; and
2158  * when other sync requests do not get released lock from a client, the client
2159  * is evicted from the cluster -- such scenarious make the life difficult, so
2160  * release locks just after they are obtained. */
2161 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2162                      __u64 *flags, ldlm_policy_data_t *policy,
2163                      struct ost_lvb *lvb, int kms_valid,
2164                      osc_enqueue_upcall_f upcall, void *cookie,
2165                      struct ldlm_enqueue_info *einfo,
2166                      struct ptlrpc_request_set *rqset, int async, int agl)
2167 {
2168         struct obd_device *obd = exp->exp_obd;
2169         struct lustre_handle lockh = { 0 };
2170         struct ptlrpc_request *req = NULL;
2171         int intent = *flags & LDLM_FL_HAS_INTENT;
2172         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2173         ldlm_mode_t mode;
2174         int rc;
2175         ENTRY;
2176
2177         /* Filesystem lock extents are extended to page boundaries so that
2178          * dealing with the page cache is a little smoother.  */
2179         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2180         policy->l_extent.end |= ~CFS_PAGE_MASK;
2181
2182         /*
2183          * kms is not valid when either object is completely fresh (so that no
2184          * locks are cached), or object was evicted. In the latter case cached
2185          * lock cannot be used, because it would prime inode state with
2186          * potentially stale LVB.
2187          */
2188         if (!kms_valid)
2189                 goto no_match;
2190
2191         /* Next, search for already existing extent locks that will cover us */
2192         /* If we're trying to read, we also search for an existing PW lock.  The
2193          * VFS and page cache already protect us locally, so lots of readers/
2194          * writers can share a single PW lock.
2195          *
2196          * There are problems with conversion deadlocks, so instead of
2197          * converting a read lock to a write lock, we'll just enqueue a new
2198          * one.
2199          *
2200          * At some point we should cancel the read lock instead of making them
2201          * send us a blocking callback, but there are problems with canceling
2202          * locks out from other users right now, too. */
2203         mode = einfo->ei_mode;
2204         if (einfo->ei_mode == LCK_PR)
2205                 mode |= LCK_PW;
2206         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2207                                einfo->ei_type, policy, mode, &lockh, 0);
2208         if (mode) {
2209                 struct ldlm_lock *matched;
2210
2211                 if (*flags & LDLM_FL_TEST_LOCK)
2212                         RETURN(ELDLM_OK);
2213
2214                 matched = ldlm_handle2lock(&lockh);
2215                 if (agl) {
2216                         /* AGL enqueues DLM locks speculatively. Therefore if
2217                          * it already exists a DLM lock, it wll just inform the
2218                          * caller to cancel the AGL process for this stripe. */
2219                         ldlm_lock_decref(&lockh, mode);
2220                         LDLM_LOCK_PUT(matched);
2221                         RETURN(-ECANCELED);
2222                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2223                         *flags |= LDLM_FL_LVB_READY;
2224
2225                         /* We already have a lock, and it's referenced. */
2226                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2227
2228                         ldlm_lock_decref(&lockh, mode);
2229                         LDLM_LOCK_PUT(matched);
2230                         RETURN(ELDLM_OK);
2231                 } else {
2232                         ldlm_lock_decref(&lockh, mode);
2233                         LDLM_LOCK_PUT(matched);
2234                 }
2235         }
2236
2237 no_match:
2238         if (*flags & LDLM_FL_TEST_LOCK)
2239                 RETURN(-ENOLCK);
2240
2241         if (intent) {
2242                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2243                                            &RQF_LDLM_ENQUEUE_LVB);
2244                 if (req == NULL)
2245                         RETURN(-ENOMEM);
2246
2247                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2248                 if (rc < 0) {
2249                         ptlrpc_request_free(req);
2250                         RETURN(rc);
2251                 }
2252
2253                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2254                                      sizeof *lvb);
2255                 ptlrpc_request_set_replen(req);
2256         }
2257
2258         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2259         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2260
2261         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2262                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2263         if (async) {
2264                 if (!rc) {
2265                         struct osc_enqueue_args *aa;
2266                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2267                         aa = ptlrpc_req_async_args(req);
2268                         aa->oa_exp    = exp;
2269                         aa->oa_mode   = einfo->ei_mode;
2270                         aa->oa_type   = einfo->ei_type;
2271                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2272                         aa->oa_upcall = upcall;
2273                         aa->oa_cookie = cookie;
2274                         aa->oa_agl    = !!agl;
2275                         if (!agl) {
2276                                 aa->oa_flags  = flags;
2277                                 aa->oa_lvb    = lvb;
2278                         } else {
2279                                 /* AGL is essentially to enqueue an DLM lock
2280                                  * in advance, so we don't care about the
2281                                  * result of AGL enqueue. */
2282                                 aa->oa_lvb    = NULL;
2283                                 aa->oa_flags  = NULL;
2284                         }
2285
2286                         req->rq_interpret_reply =
2287                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2288                         if (rqset == PTLRPCD_SET)
2289                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2290                         else
2291                                 ptlrpc_set_add_req(rqset, req);
2292                 } else if (intent) {
2293                         ptlrpc_req_finished(req);
2294                 }
2295                 RETURN(rc);
2296         }
2297
2298         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2299                               flags, agl, rc);
2300         if (intent)
2301                 ptlrpc_req_finished(req);
2302
2303         RETURN(rc);
2304 }
2305
2306 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2307                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2308                    __u64 *flags, void *data, struct lustre_handle *lockh,
2309                    int unref)
2310 {
2311         struct obd_device *obd = exp->exp_obd;
2312         __u64 lflags = *flags;
2313         ldlm_mode_t rc;
2314         ENTRY;
2315
2316         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2317                 RETURN(-EIO);
2318
2319         /* Filesystem lock extents are extended to page boundaries so that
2320          * dealing with the page cache is a little smoother */
2321         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2322         policy->l_extent.end |= ~CFS_PAGE_MASK;
2323
2324         /* Next, search for already existing extent locks that will cover us */
2325         /* If we're trying to read, we also search for an existing PW lock.  The
2326          * VFS and page cache already protect us locally, so lots of readers/
2327          * writers can share a single PW lock. */
2328         rc = mode;
2329         if (mode == LCK_PR)
2330                 rc |= LCK_PW;
2331         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2332                              res_id, type, policy, rc, lockh, unref);
2333         if (rc) {
2334                 if (data != NULL) {
2335                         if (!osc_set_data_with_check(lockh, data)) {
2336                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2337                                         ldlm_lock_decref(lockh, rc);
2338                                 RETURN(0);
2339                         }
2340                 }
2341                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2342                         ldlm_lock_addref(lockh, LCK_PR);
2343                         ldlm_lock_decref(lockh, LCK_PW);
2344                 }
2345                 RETURN(rc);
2346         }
2347         RETURN(rc);
2348 }
2349
2350 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2351 {
2352         ENTRY;
2353
2354         if (unlikely(mode == LCK_GROUP))
2355                 ldlm_lock_decref_and_cancel(lockh, mode);
2356         else
2357                 ldlm_lock_decref(lockh, mode);
2358
2359         RETURN(0);
2360 }
2361
2362 static int osc_statfs_interpret(const struct lu_env *env,
2363                                 struct ptlrpc_request *req,
2364                                 struct osc_async_args *aa, int rc)
2365 {
2366         struct obd_statfs *msfs;
2367         ENTRY;
2368
2369         if (rc == -EBADR)
2370                 /* The request has in fact never been sent
2371                  * due to issues at a higher level (LOV).
2372                  * Exit immediately since the caller is
2373                  * aware of the problem and takes care
2374                  * of the clean up */
2375                  RETURN(rc);
2376
2377         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2378             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2379                 GOTO(out, rc = 0);
2380
2381         if (rc != 0)
2382                 GOTO(out, rc);
2383
2384         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2385         if (msfs == NULL) {
2386                 GOTO(out, rc = -EPROTO);
2387         }
2388
2389         *aa->aa_oi->oi_osfs = *msfs;
2390 out:
2391         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2392         RETURN(rc);
2393 }
2394
2395 static int osc_statfs_async(struct obd_export *exp,
2396                             struct obd_info *oinfo, __u64 max_age,
2397                             struct ptlrpc_request_set *rqset)
2398 {
2399         struct obd_device     *obd = class_exp2obd(exp);
2400         struct ptlrpc_request *req;
2401         struct osc_async_args *aa;
2402         int                    rc;
2403         ENTRY;
2404
2405         /* We could possibly pass max_age in the request (as an absolute
2406          * timestamp or a "seconds.usec ago") so the target can avoid doing
2407          * extra calls into the filesystem if that isn't necessary (e.g.
2408          * during mount that would help a bit).  Having relative timestamps
2409          * is not so great if request processing is slow, while absolute
2410          * timestamps are not ideal because they need time synchronization. */
2411         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2412         if (req == NULL)
2413                 RETURN(-ENOMEM);
2414
2415         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2416         if (rc) {
2417                 ptlrpc_request_free(req);
2418                 RETURN(rc);
2419         }
2420         ptlrpc_request_set_replen(req);
2421         req->rq_request_portal = OST_CREATE_PORTAL;
2422         ptlrpc_at_set_req_timeout(req);
2423
2424         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2425                 /* procfs requests not want stat in wait for avoid deadlock */
2426                 req->rq_no_resend = 1;
2427                 req->rq_no_delay = 1;
2428         }
2429
2430         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2431         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2432         aa = ptlrpc_req_async_args(req);
2433         aa->aa_oi = oinfo;
2434
2435         ptlrpc_set_add_req(rqset, req);
2436         RETURN(0);
2437 }
2438
2439 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2440                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2441 {
2442         struct obd_device     *obd = class_exp2obd(exp);
2443         struct obd_statfs     *msfs;
2444         struct ptlrpc_request *req;
2445         struct obd_import     *imp = NULL;
2446         int rc;
2447         ENTRY;
2448
2449         /*Since the request might also come from lprocfs, so we need
2450          *sync this with client_disconnect_export Bug15684*/
2451         down_read(&obd->u.cli.cl_sem);
2452         if (obd->u.cli.cl_import)
2453                 imp = class_import_get(obd->u.cli.cl_import);
2454         up_read(&obd->u.cli.cl_sem);
2455         if (!imp)
2456                 RETURN(-ENODEV);
2457
2458         /* We could possibly pass max_age in the request (as an absolute
2459          * timestamp or a "seconds.usec ago") so the target can avoid doing
2460          * extra calls into the filesystem if that isn't necessary (e.g.
2461          * during mount that would help a bit).  Having relative timestamps
2462          * is not so great if request processing is slow, while absolute
2463          * timestamps are not ideal because they need time synchronization. */
2464         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2465
2466         class_import_put(imp);
2467
2468         if (req == NULL)
2469                 RETURN(-ENOMEM);
2470
2471         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2472         if (rc) {
2473                 ptlrpc_request_free(req);
2474                 RETURN(rc);
2475         }
2476         ptlrpc_request_set_replen(req);
2477         req->rq_request_portal = OST_CREATE_PORTAL;
2478         ptlrpc_at_set_req_timeout(req);
2479
2480         if (flags & OBD_STATFS_NODELAY) {
2481                 /* procfs requests not want stat in wait for avoid deadlock */
2482                 req->rq_no_resend = 1;
2483                 req->rq_no_delay = 1;
2484         }
2485
2486         rc = ptlrpc_queue_wait(req);
2487         if (rc)
2488                 GOTO(out, rc);
2489
2490         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2491         if (msfs == NULL) {
2492                 GOTO(out, rc = -EPROTO);
2493         }
2494
2495         *osfs = *msfs;
2496
2497         EXIT;
2498  out:
2499         ptlrpc_req_finished(req);
2500         return rc;
2501 }
2502
2503 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2504                          void *karg, void *uarg)
2505 {
2506         struct obd_device *obd = exp->exp_obd;
2507         struct obd_ioctl_data *data = karg;
2508         int err = 0;
2509         ENTRY;
2510
2511         if (!try_module_get(THIS_MODULE)) {
2512                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2513                        module_name(THIS_MODULE));
2514                 return -EINVAL;
2515         }
2516         switch (cmd) {
2517         case OBD_IOC_CLIENT_RECOVER:
2518                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2519                                             data->ioc_inlbuf1, 0);
2520                 if (err > 0)
2521                         err = 0;
2522                 GOTO(out, err);
2523         case IOC_OSC_SET_ACTIVE:
2524                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2525                                                data->ioc_offset);
2526                 GOTO(out, err);
2527         case OBD_IOC_POLL_QUOTACHECK:
2528                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2529                 GOTO(out, err);
2530         case OBD_IOC_PING_TARGET:
2531                 err = ptlrpc_obd_ping(obd);
2532                 GOTO(out, err);
2533         default:
2534                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2535                        cmd, current_comm());
2536                 GOTO(out, err = -ENOTTY);
2537         }
2538 out:
2539         module_put(THIS_MODULE);
2540         return err;
2541 }
2542
2543 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2544                         obd_count keylen, void *key, __u32 *vallen, void *val,
2545                         struct lov_stripe_md *lsm)
2546 {
2547         ENTRY;
2548         if (!vallen || !val)
2549                 RETURN(-EFAULT);
2550
2551         if (KEY_IS(KEY_FIEMAP)) {
2552                 struct ll_fiemap_info_key *fm_key =
2553                                 (struct ll_fiemap_info_key *)key;
2554                 struct ldlm_res_id       res_id;
2555                 ldlm_policy_data_t       policy;
2556                 struct lustre_handle     lockh;
2557                 ldlm_mode_t              mode = 0;
2558                 struct ptlrpc_request   *req;
2559                 struct ll_user_fiemap   *reply;
2560                 char                    *tmp;
2561                 int                      rc;
2562
2563                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2564                         goto skip_locking;
2565
2566                 policy.l_extent.start = fm_key->fiemap.fm_start &
2567                                                 CFS_PAGE_MASK;
2568
2569                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2570                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2571                         policy.l_extent.end = OBD_OBJECT_EOF;
2572                 else
2573                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2574                                 fm_key->fiemap.fm_length +
2575                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2576
2577                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2578                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2579                                        LDLM_FL_BLOCK_GRANTED |
2580                                        LDLM_FL_LVB_READY,
2581                                        &res_id, LDLM_EXTENT, &policy,
2582                                        LCK_PR | LCK_PW, &lockh, 0);
2583                 if (mode) { /* lock is cached on client */
2584                         if (mode != LCK_PR) {
2585                                 ldlm_lock_addref(&lockh, LCK_PR);
2586                                 ldlm_lock_decref(&lockh, LCK_PW);
2587                         }
2588                 } else { /* no cached lock, needs acquire lock on server side */
2589                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2590                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2591                 }
2592
2593 skip_locking:
2594                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2595                                            &RQF_OST_GET_INFO_FIEMAP);
2596                 if (req == NULL)
2597                         GOTO(drop_lock, rc = -ENOMEM);
2598
2599                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2600                                      RCL_CLIENT, keylen);
2601                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2602                                      RCL_CLIENT, *vallen);
2603                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2604                                      RCL_SERVER, *vallen);
2605
2606                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2607                 if (rc) {
2608                         ptlrpc_request_free(req);
2609                         GOTO(drop_lock, rc);
2610                 }
2611
2612                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2613                 memcpy(tmp, key, keylen);
2614                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2615                 memcpy(tmp, val, *vallen);
2616
2617                 ptlrpc_request_set_replen(req);
2618                 rc = ptlrpc_queue_wait(req);
2619                 if (rc)
2620                         GOTO(fini_req, rc);
2621
2622                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2623                 if (reply == NULL)
2624                         GOTO(fini_req, rc = -EPROTO);
2625
2626                 memcpy(val, reply, *vallen);
2627 fini_req:
2628                 ptlrpc_req_finished(req);
2629 drop_lock:
2630                 if (mode)
2631                         ldlm_lock_decref(&lockh, LCK_PR);
2632                 RETURN(rc);
2633         }
2634
2635         RETURN(-EINVAL);
2636 }
2637
2638 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2639                               obd_count keylen, void *key, obd_count vallen,
2640                               void *val, struct ptlrpc_request_set *set)
2641 {
2642         struct ptlrpc_request *req;
2643         struct obd_device     *obd = exp->exp_obd;
2644         struct obd_import     *imp = class_exp2cliimp(exp);
2645         char                  *tmp;
2646         int                    rc;
2647         ENTRY;
2648
2649         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2650
2651         if (KEY_IS(KEY_CHECKSUM)) {
2652                 if (vallen != sizeof(int))
2653                         RETURN(-EINVAL);
2654                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2655                 RETURN(0);
2656         }
2657
2658         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2659                 sptlrpc_conf_client_adapt(obd);
2660                 RETURN(0);
2661         }
2662
2663         if (KEY_IS(KEY_FLUSH_CTX)) {
2664                 sptlrpc_import_flush_my_ctx(imp);
2665                 RETURN(0);
2666         }
2667
2668         if (KEY_IS(KEY_CACHE_SET)) {
2669                 struct client_obd *cli = &obd->u.cli;
2670
2671                 LASSERT(cli->cl_cache == NULL); /* only once */
2672                 cli->cl_cache = (struct cl_client_cache *)val;
2673                 atomic_inc(&cli->cl_cache->ccc_users);
2674                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2675
2676                 /* add this osc into entity list */
2677                 LASSERT(list_empty(&cli->cl_lru_osc));
2678                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2679                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2680                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2681
2682                 RETURN(0);
2683         }
2684
2685         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2686                 struct client_obd *cli = &obd->u.cli;
2687                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2688                 long target = *(long *)val;
2689
2690                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2691                 *(long *)val -= nr;
2692                 RETURN(0);
2693         }
2694
2695         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2696                 RETURN(-EINVAL);
2697
2698         /* We pass all other commands directly to OST. Since nobody calls osc
2699            methods directly and everybody is supposed to go through LOV, we
2700            assume lov checked invalid values for us.
2701            The only recognised values so far are evict_by_nid and mds_conn.
2702            Even if something bad goes through, we'd get a -EINVAL from OST
2703            anyway. */
2704
2705         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2706                                                 &RQF_OST_SET_GRANT_INFO :
2707                                                 &RQF_OBD_SET_INFO);
2708         if (req == NULL)
2709                 RETURN(-ENOMEM);
2710
2711         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2712                              RCL_CLIENT, keylen);
2713         if (!KEY_IS(KEY_GRANT_SHRINK))
2714                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2715                                      RCL_CLIENT, vallen);
2716         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2717         if (rc) {
2718                 ptlrpc_request_free(req);
2719                 RETURN(rc);
2720         }
2721
2722         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2723         memcpy(tmp, key, keylen);
2724         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2725                                                         &RMF_OST_BODY :
2726                                                         &RMF_SETINFO_VAL);
2727         memcpy(tmp, val, vallen);
2728
2729         if (KEY_IS(KEY_GRANT_SHRINK)) {
2730                 struct osc_grant_args *aa;
2731                 struct obdo *oa;
2732
2733                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2734                 aa = ptlrpc_req_async_args(req);
2735                 OBDO_ALLOC(oa);
2736                 if (!oa) {
2737                         ptlrpc_req_finished(req);
2738                         RETURN(-ENOMEM);
2739                 }
2740                 *oa = ((struct ost_body *)val)->oa;
2741                 aa->aa_oa = oa;
2742                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2743         }
2744
2745         ptlrpc_request_set_replen(req);
2746         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2747                 LASSERT(set != NULL);
2748                 ptlrpc_set_add_req(set, req);
2749                 ptlrpc_check_set(NULL, set);
2750         } else
2751                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2752
2753         RETURN(0);
2754 }
2755
2756 static int osc_reconnect(const struct lu_env *env,
2757                          struct obd_export *exp, struct obd_device *obd,
2758                          struct obd_uuid *cluuid,
2759                          struct obd_connect_data *data,
2760                          void *localdata)
2761 {
2762         struct client_obd *cli = &obd->u.cli;
2763
2764         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2765                 long lost_grant;
2766
2767                 client_obd_list_lock(&cli->cl_loi_list_lock);
2768                 data->ocd_grant = (cli->cl_avail_grant +
2769                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2770                                   2 * cli_brw_size(obd);
2771                 lost_grant = cli->cl_lost_grant;
2772                 cli->cl_lost_grant = 0;
2773                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2774
2775                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2776                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2777                        data->ocd_version, data->ocd_grant, lost_grant);
2778         }
2779
2780         RETURN(0);
2781 }
2782
2783 static int osc_disconnect(struct obd_export *exp)
2784 {
2785         struct obd_device *obd = class_exp2obd(exp);
2786         int rc;
2787
2788         rc = client_disconnect_export(exp);
2789         /**
2790          * Initially we put del_shrink_grant before disconnect_export, but it
2791          * causes the following problem if setup (connect) and cleanup
2792          * (disconnect) are tangled together.
2793          *      connect p1                     disconnect p2
2794          *   ptlrpc_connect_import
2795          *     ...............               class_manual_cleanup
2796          *                                     osc_disconnect
2797          *                                     del_shrink_grant
2798          *   ptlrpc_connect_interrupt
2799          *     init_grant_shrink
2800          *   add this client to shrink list
2801          *                                      cleanup_osc
2802          * Bang! pinger trigger the shrink.
2803          * So the osc should be disconnected from the shrink list, after we
2804          * are sure the import has been destroyed. BUG18662
2805          */
2806         if (obd->u.cli.cl_import == NULL)
2807                 osc_del_shrink_grant(&obd->u.cli);
2808         return rc;
2809 }
2810
2811 static int osc_import_event(struct obd_device *obd,
2812                             struct obd_import *imp,
2813                             enum obd_import_event event)
2814 {
2815         struct client_obd *cli;
2816         int rc = 0;
2817
2818         ENTRY;
2819         LASSERT(imp->imp_obd == obd);
2820
2821         switch (event) {
2822         case IMP_EVENT_DISCON: {
2823                 cli = &obd->u.cli;
2824                 client_obd_list_lock(&cli->cl_loi_list_lock);
2825                 cli->cl_avail_grant = 0;
2826                 cli->cl_lost_grant = 0;
2827                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2828                 break;
2829         }
2830         case IMP_EVENT_INACTIVE: {
2831                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2832                 break;
2833         }
2834         case IMP_EVENT_INVALIDATE: {
2835                 struct ldlm_namespace *ns = obd->obd_namespace;
2836                 struct lu_env         *env;
2837                 int                    refcheck;
2838
2839                 env = cl_env_get(&refcheck);
2840                 if (!IS_ERR(env)) {
2841                         /* Reset grants */
2842                         cli = &obd->u.cli;
2843                         /* all pages go to failing rpcs due to the invalid
2844                          * import */
2845                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2846
2847                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2848                         cl_env_put(env, &refcheck);
2849                 } else
2850                         rc = PTR_ERR(env);
2851                 break;
2852         }
2853         case IMP_EVENT_ACTIVE: {
2854                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2855                 break;
2856         }
2857         case IMP_EVENT_OCD: {
2858                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2859
2860                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2861                         osc_init_grant(&obd->u.cli, ocd);
2862
2863                 /* See bug 7198 */
2864                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2865                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2866
2867                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2868                 break;
2869         }
2870         case IMP_EVENT_DEACTIVATE: {
2871                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2872                 break;
2873         }
2874         case IMP_EVENT_ACTIVATE: {
2875                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2876                 break;
2877         }
2878         default:
2879                 CERROR("Unknown import event %d\n", event);
2880                 LBUG();
2881         }
2882         RETURN(rc);
2883 }
2884
2885 /**
2886  * Determine whether the lock can be canceled before replaying the lock
2887  * during recovery, see bug16774 for detailed information.
2888  *
2889  * \retval zero the lock can't be canceled
2890  * \retval other ok to cancel
2891  */
2892 static int osc_cancel_weight(struct ldlm_lock *lock)
2893 {
2894         /*
2895          * Cancel all unused and granted extent lock.
2896          */
2897         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2898             lock->l_granted_mode == lock->l_req_mode &&
2899             osc_ldlm_weigh_ast(lock) == 0)
2900                 RETURN(1);
2901
2902         RETURN(0);
2903 }
2904
2905 static int brw_queue_work(const struct lu_env *env, void *data)
2906 {
2907         struct client_obd *cli = data;
2908
2909         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2910
2911         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2912         RETURN(0);
2913 }
2914
2915 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2916 {
2917         struct client_obd *cli = &obd->u.cli;
2918         struct obd_type   *type;
2919         void              *handler;
2920         int                rc;
2921         ENTRY;
2922
2923         rc = ptlrpcd_addref();
2924         if (rc)
2925                 RETURN(rc);
2926
2927         rc = client_obd_setup(obd, lcfg);
2928         if (rc)
2929                 GOTO(out_ptlrpcd, rc);
2930
2931         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2932         if (IS_ERR(handler))
2933                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2934         cli->cl_writeback_work = handler;
2935
2936         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2937         if (IS_ERR(handler))
2938                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2939         cli->cl_lru_work = handler;
2940
2941         rc = osc_quota_setup(obd);
2942         if (rc)
2943                 GOTO(out_ptlrpcd_work, rc);
2944
2945         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2946
2947 #ifdef LPROCFS
2948         obd->obd_vars = lprocfs_osc_obd_vars;
2949 #endif
2950         /* If this is true then both client (osc) and server (osp) are on the
2951          * same node. The osp layer if loaded first will register the osc proc
2952          * directory. In that case this obd_device will be attached its proc
2953          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2954         type = class_search_type(LUSTRE_OSP_NAME);
2955         if (type && type->typ_procsym) {
2956                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
2957                                                            type->typ_procsym,
2958                                                            obd->obd_vars, obd);
2959                 if (IS_ERR(obd->obd_proc_entry)) {
2960                         rc = PTR_ERR(obd->obd_proc_entry);
2961                         CERROR("error %d setting up lprocfs for %s\n", rc,
2962                                obd->obd_name);
2963                         obd->obd_proc_entry = NULL;
2964                 }
2965         } else {
2966                 rc = lprocfs_obd_setup(obd);
2967         }
2968
2969         /* If the basic OSC proc tree construction succeeded then
2970          * lets do the rest. */
2971         if (rc == 0) {
2972                 lproc_osc_attach_seqstat(obd);
2973                 sptlrpc_lprocfs_cliobd_attach(obd);
2974                 ptlrpc_lprocfs_register_obd(obd);
2975         }
2976
2977         /* We need to allocate a few requests more, because
2978          * brw_interpret tries to create new requests before freeing
2979          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2980          * reserved, but I'm afraid that might be too much wasted RAM
2981          * in fact, so 2 is just my guess and still should work. */
2982         cli->cl_import->imp_rq_pool =
2983                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2984                                     OST_MAXREQSIZE,
2985                                     ptlrpc_add_rqs_to_pool);
2986
2987         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2988         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2989         RETURN(0);
2990
2991 out_ptlrpcd_work:
2992         if (cli->cl_writeback_work != NULL) {
2993                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2994                 cli->cl_writeback_work = NULL;
2995         }
2996         if (cli->cl_lru_work != NULL) {
2997                 ptlrpcd_destroy_work(cli->cl_lru_work);
2998                 cli->cl_lru_work = NULL;
2999         }
3000 out_client_setup:
3001         client_obd_cleanup(obd);
3002 out_ptlrpcd:
3003         ptlrpcd_decref();
3004         RETURN(rc);
3005 }
3006
3007 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3008 {
3009         int rc = 0;
3010         ENTRY;
3011
3012         switch (stage) {
3013         case OBD_CLEANUP_EARLY: {
3014                 struct obd_import *imp;
3015                 imp = obd->u.cli.cl_import;
3016                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3017                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3018                 ptlrpc_deactivate_import(imp);
3019                 spin_lock(&imp->imp_lock);
3020                 imp->imp_pingable = 0;
3021                 spin_unlock(&imp->imp_lock);
3022                 break;
3023         }
3024         case OBD_CLEANUP_EXPORTS: {
3025                 struct client_obd *cli = &obd->u.cli;
3026                 /* LU-464
3027                  * for echo client, export may be on zombie list, wait for
3028                  * zombie thread to cull it, because cli.cl_import will be
3029                  * cleared in client_disconnect_export():
3030                  *   class_export_destroy() -> obd_cleanup() ->
3031                  *   echo_device_free() -> echo_client_cleanup() ->
3032                  *   obd_disconnect() -> osc_disconnect() ->
3033                  *   client_disconnect_export()
3034                  */
3035                 obd_zombie_barrier();
3036                 if (cli->cl_writeback_work) {
3037                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3038                         cli->cl_writeback_work = NULL;
3039                 }
3040                 if (cli->cl_lru_work) {
3041                         ptlrpcd_destroy_work(cli->cl_lru_work);
3042                         cli->cl_lru_work = NULL;
3043                 }
3044                 obd_cleanup_client_import(obd);
3045                 ptlrpc_lprocfs_unregister_obd(obd);
3046                 lprocfs_obd_cleanup(obd);
3047                 break;
3048                 }
3049         }
3050         RETURN(rc);
3051 }
3052
3053 int osc_cleanup(struct obd_device *obd)
3054 {
3055         struct client_obd *cli = &obd->u.cli;
3056         int rc;
3057
3058         ENTRY;
3059
3060         /* lru cleanup */
3061         if (cli->cl_cache != NULL) {
3062                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3063                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3064                 list_del_init(&cli->cl_lru_osc);
3065                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3066                 cli->cl_lru_left = NULL;
3067                 atomic_dec(&cli->cl_cache->ccc_users);
3068                 cli->cl_cache = NULL;
3069         }
3070
3071         /* free memory of osc quota cache */
3072         osc_quota_cleanup(obd);
3073
3074         rc = client_obd_cleanup(obd);
3075
3076         ptlrpcd_decref();
3077         RETURN(rc);
3078 }
3079
3080 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3081 {
3082         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3083         return rc > 0 ? 0: rc;
3084 }
3085
3086 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3087 {
3088         return osc_process_config_base(obd, buf);
3089 }
3090
3091 struct obd_ops osc_obd_ops = {
3092         .o_owner                = THIS_MODULE,
3093         .o_setup                = osc_setup,
3094         .o_precleanup           = osc_precleanup,
3095         .o_cleanup              = osc_cleanup,
3096         .o_add_conn             = client_import_add_conn,
3097         .o_del_conn             = client_import_del_conn,
3098         .o_connect              = client_connect_import,
3099         .o_reconnect            = osc_reconnect,
3100         .o_disconnect           = osc_disconnect,
3101         .o_statfs               = osc_statfs,
3102         .o_statfs_async         = osc_statfs_async,
3103         .o_create               = osc_create,
3104         .o_destroy              = osc_destroy,
3105         .o_getattr              = osc_getattr,
3106         .o_getattr_async        = osc_getattr_async,
3107         .o_setattr              = osc_setattr,
3108         .o_setattr_async        = osc_setattr_async,
3109         .o_change_cbdata        = osc_change_cbdata,
3110         .o_find_cbdata          = osc_find_cbdata,
3111         .o_iocontrol            = osc_iocontrol,
3112         .o_get_info             = osc_get_info,
3113         .o_set_info_async       = osc_set_info_async,
3114         .o_import_event         = osc_import_event,
3115         .o_process_config       = osc_process_config,
3116         .o_quotactl             = osc_quotactl,
3117         .o_quotacheck           = osc_quotacheck,
3118 };
3119
3120 extern struct lu_kmem_descr osc_caches[];
3121 extern struct lock_class_key osc_ast_guard_class;
3122
3123 int __init osc_init(void)
3124 {
3125         bool enable_proc = true;
3126         struct obd_type *type;
3127         int rc;
3128         ENTRY;
3129
3130         /* print an address of _any_ initialized kernel symbol from this
3131          * module, to allow debugging with gdb that doesn't support data
3132          * symbols from modules.*/
3133         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3134
3135         rc = lu_kmem_init(osc_caches);
3136         if (rc)
3137                 RETURN(rc);
3138
3139         type = class_search_type(LUSTRE_OSP_NAME);
3140         if (type != NULL && type->typ_procsym != NULL)
3141                 enable_proc = false;
3142
3143         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3144                                  LUSTRE_OSC_NAME, &osc_device_type);
3145         if (rc) {
3146                 lu_kmem_fini(osc_caches);
3147                 RETURN(rc);
3148         }
3149
3150         RETURN(rc);
3151 }
3152
3153 static void /*__exit*/ osc_exit(void)
3154 {
3155         class_unregister_type(LUSTRE_OSC_NAME);
3156         lu_kmem_fini(osc_caches);
3157 }
3158
3159 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3160 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3161 MODULE_LICENSE("GPL");
3162
3163 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);