Whamcloud - gitweb
5429b1444ed324baeb7ca34049226167863633be
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #include <libcfs/libcfs.h>
46
47 #ifndef __KERNEL__
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
55 #include <obd_ost.h>
56 #include <obd_lov.h>
57
58 #ifdef  __CYGWIN__
59 # include <ctype.h>
60 #endif
61
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
73                             int ptlrpc);
74 int osc_cleanup(struct obd_device *obd);
75
76 /* Pack OSC object metadata for disk storage (LE byte order). */
77 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
78                       struct lov_stripe_md *lsm)
79 {
80         int lmm_size;
81         ENTRY;
82
83         lmm_size = sizeof(**lmmp);
84         if (!lmmp)
85                 RETURN(lmm_size);
86
87         if (*lmmp && !lsm) {
88                 OBD_FREE(*lmmp, lmm_size);
89                 *lmmp = NULL;
90                 RETURN(0);
91         }
92
93         if (!*lmmp) {
94                 OBD_ALLOC(*lmmp, lmm_size);
95                 if (!*lmmp)
96                         RETURN(-ENOMEM);
97         }
98
99         if (lsm) {
100                 LASSERT(lsm->lsm_object_id);
101                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
102                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
103                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
104         }
105
106         RETURN(lmm_size);
107 }
108
109 /* Unpack OSC object metadata from disk storage (LE byte order). */
110 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
111                         struct lov_mds_md *lmm, int lmm_bytes)
112 {
113         int lsm_size;
114         struct obd_import *imp = class_exp2cliimp(exp);
115         ENTRY;
116
117         if (lmm != NULL) {
118                 if (lmm_bytes < sizeof (*lmm)) {
119                         CERROR("lov_mds_md too small: %d, need %d\n",
120                                lmm_bytes, (int)sizeof(*lmm));
121                         RETURN(-EINVAL);
122                 }
123                 /* XXX LOV_MAGIC etc check? */
124
125                 if (lmm->lmm_object_id == 0) {
126                         CERROR("lov_mds_md: zero lmm_object_id\n");
127                         RETURN(-EINVAL);
128                 }
129         }
130
131         lsm_size = lov_stripe_md_size(1);
132         if (lsmp == NULL)
133                 RETURN(lsm_size);
134
135         if (*lsmp != NULL && lmm == NULL) {
136                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137                 OBD_FREE(*lsmp, lsm_size);
138                 *lsmp = NULL;
139                 RETURN(0);
140         }
141
142         if (*lsmp == NULL) {
143                 OBD_ALLOC(*lsmp, lsm_size);
144                 if (*lsmp == NULL)
145                         RETURN(-ENOMEM);
146                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
147                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
148                         OBD_FREE(*lsmp, lsm_size);
149                         RETURN(-ENOMEM);
150                 }
151                 loi_init((*lsmp)->lsm_oinfo[0]);
152         }
153
154         if (lmm != NULL) {
155                 /* XXX zero *lsmp? */
156                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
157                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
158                 LASSERT((*lsmp)->lsm_object_id);
159                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
160         }
161
162         if (imp != NULL &&
163             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
164                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
165         else
166                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
167
168         RETURN(lsm_size);
169 }
170
171 static inline void osc_pack_capa(struct ptlrpc_request *req,
172                                  struct ost_body *body, void *capa)
173 {
174         struct obd_capa *oc = (struct obd_capa *)capa;
175         struct lustre_capa *c;
176
177         if (!capa)
178                 return;
179
180         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
181         LASSERT(c);
182         capa_cpy(c, oc);
183         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
184         DEBUG_CAPA(D_SEC, c, "pack");
185 }
186
187 static inline void osc_pack_req_body(struct ptlrpc_request *req,
188                                      struct obd_info *oinfo)
189 {
190         struct ost_body *body;
191
192         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
193         LASSERT(body);
194
195         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
196         osc_pack_capa(req, body, oinfo->oi_capa);
197 }
198
199 static inline void osc_set_capa_size(struct ptlrpc_request *req,
200                                      const struct req_msg_field *field,
201                                      struct obd_capa *oc)
202 {
203         if (oc == NULL)
204                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
205         else
206                 /* it is already calculated as sizeof struct obd_capa */
207                 ;
208 }
209
210 static int osc_getattr_interpret(const struct lu_env *env,
211                                  struct ptlrpc_request *req,
212                                  struct osc_async_args *aa, int rc)
213 {
214         struct ost_body *body;
215         ENTRY;
216
217         if (rc != 0)
218                 GOTO(out, rc);
219
220         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
221         if (body) {
222                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
223                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
224
225                 /* This should really be sent by the OST */
226                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
227                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
228         } else {
229                 CDEBUG(D_INFO, "can't unpack ost_body\n");
230                 rc = -EPROTO;
231                 aa->aa_oi->oi_oa->o_valid = 0;
232         }
233 out:
234         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
235         RETURN(rc);
236 }
237
238 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
239                              struct ptlrpc_request_set *set)
240 {
241         struct ptlrpc_request *req;
242         struct osc_async_args *aa;
243         int                    rc;
244         ENTRY;
245
246         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
247         if (req == NULL)
248                 RETURN(-ENOMEM);
249
250         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
251         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
252         if (rc) {
253                 ptlrpc_request_free(req);
254                 RETURN(rc);
255         }
256
257         osc_pack_req_body(req, oinfo);
258
259         ptlrpc_request_set_replen(req);
260         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
261
262         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
263         aa = ptlrpc_req_async_args(req);
264         aa->aa_oi = oinfo;
265
266         ptlrpc_set_add_req(set, req);
267         RETURN(0);
268 }
269
270 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
271 {
272         struct ptlrpc_request *req;
273         struct ost_body       *body;
274         int                    rc;
275         ENTRY;
276
277         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
278         if (req == NULL)
279                 RETURN(-ENOMEM);
280
281         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
282         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283         if (rc) {
284                 ptlrpc_request_free(req);
285                 RETURN(rc);
286         }
287
288         osc_pack_req_body(req, oinfo);
289
290         ptlrpc_request_set_replen(req);
291
292         rc = ptlrpc_queue_wait(req);
293         if (rc)
294                 GOTO(out, rc);
295
296         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
297         if (body == NULL)
298                 GOTO(out, rc = -EPROTO);
299
300         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
301         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
302
303         /* This should really be sent by the OST */
304         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
305         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
306
307         EXIT;
308  out:
309         ptlrpc_req_finished(req);
310         return rc;
311 }
312
313 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
314                        struct obd_trans_info *oti)
315 {
316         struct ptlrpc_request *req;
317         struct ost_body       *body;
318         int                    rc;
319         ENTRY;
320
321         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322
323         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324         if (req == NULL)
325                 RETURN(-ENOMEM);
326
327         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
328         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
329         if (rc) {
330                 ptlrpc_request_free(req);
331                 RETURN(rc);
332         }
333
334         osc_pack_req_body(req, oinfo);
335
336         ptlrpc_request_set_replen(req);
337
338         rc = ptlrpc_queue_wait(req);
339         if (rc)
340                 GOTO(out, rc);
341
342         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
343         if (body == NULL)
344                 GOTO(out, rc = -EPROTO);
345
346         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
347
348         EXIT;
349 out:
350         ptlrpc_req_finished(req);
351         RETURN(rc);
352 }
353
354 static int osc_setattr_interpret(const struct lu_env *env,
355                                  struct ptlrpc_request *req,
356                                  struct osc_setattr_args *sa, int rc)
357 {
358         struct ost_body *body;
359         ENTRY;
360
361         if (rc != 0)
362                 GOTO(out, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out, rc = -EPROTO);
367
368         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
369 out:
370         rc = sa->sa_upcall(sa->sa_cookie, rc);
371         RETURN(rc);
372 }
373
374 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
375                            struct obd_trans_info *oti,
376                            obd_enqueue_update_f upcall, void *cookie,
377                            struct ptlrpc_request_set *rqset)
378 {
379         struct ptlrpc_request   *req;
380         struct osc_setattr_args *sa;
381         int                      rc;
382         ENTRY;
383
384         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
385         if (req == NULL)
386                 RETURN(-ENOMEM);
387
388         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
389         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
390         if (rc) {
391                 ptlrpc_request_free(req);
392                 RETURN(rc);
393         }
394
395         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
396                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
397
398         osc_pack_req_body(req, oinfo);
399
400         ptlrpc_request_set_replen(req);
401
402         /* do mds to ost setattr asynchronously */
403         if (!rqset) {
404                 /* Do not wait for response. */
405                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
406         } else {
407                 req->rq_interpret_reply =
408                         (ptlrpc_interpterer_t)osc_setattr_interpret;
409
410                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
411                 sa = ptlrpc_req_async_args(req);
412                 sa->sa_oa = oinfo->oi_oa;
413                 sa->sa_upcall = upcall;
414                 sa->sa_cookie = cookie;
415
416                 if (rqset == PTLRPCD_SET)
417                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
418                 else
419                         ptlrpc_set_add_req(rqset, req);
420         }
421
422         RETURN(0);
423 }
424
425 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
426                              struct obd_trans_info *oti,
427                              struct ptlrpc_request_set *rqset)
428 {
429         return osc_setattr_async_base(exp, oinfo, oti,
430                                       oinfo->oi_cb_up, oinfo, rqset);
431 }
432
433 int osc_real_create(struct obd_export *exp, struct obdo *oa,
434                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
435 {
436         struct ptlrpc_request *req;
437         struct ost_body       *body;
438         struct lov_stripe_md  *lsm;
439         int                    rc;
440         ENTRY;
441
442         LASSERT(oa);
443         LASSERT(ea);
444
445         lsm = *ea;
446         if (!lsm) {
447                 rc = obd_alloc_memmd(exp, &lsm);
448                 if (rc < 0)
449                         RETURN(rc);
450         }
451
452         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
453         if (req == NULL)
454                 GOTO(out, rc = -ENOMEM);
455
456         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
457         if (rc) {
458                 ptlrpc_request_free(req);
459                 GOTO(out, rc);
460         }
461
462         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
463         LASSERT(body);
464         lustre_set_wire_obdo(&body->oa, oa);
465
466         ptlrpc_request_set_replen(req);
467
468         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469             oa->o_flags == OBD_FL_DELORPHAN) {
470                 DEBUG_REQ(D_HA, req,
471                           "delorphan from OST integration");
472                 /* Don't resend the delorphan req */
473                 req->rq_no_resend = req->rq_no_delay = 1;
474         }
475
476         rc = ptlrpc_queue_wait(req);
477         if (rc)
478                 GOTO(out_req, rc);
479
480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
481         if (body == NULL)
482                 GOTO(out_req, rc = -EPROTO);
483
484         lustre_get_wire_obdo(oa, &body->oa);
485
486         /* This should really be sent by the OST */
487         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
488         oa->o_valid |= OBD_MD_FLBLKSZ;
489
490         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491          * have valid lsm_oinfo data structs, so don't go touching that.
492          * This needs to be fixed in a big way.
493          */
494         lsm->lsm_object_id = oa->o_id;
495         lsm->lsm_object_seq = oa->o_seq;
496         *ea = lsm;
497
498         if (oti != NULL) {
499                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
500
501                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
502                         if (!oti->oti_logcookies)
503                                 oti_alloc_cookies(oti, 1);
504                         *oti->oti_logcookies = oa->o_lcookie;
505                 }
506         }
507
508         CDEBUG(D_HA, "transno: "LPD64"\n",
509                lustre_msg_get_transno(req->rq_repmsg));
510 out_req:
511         ptlrpc_req_finished(req);
512 out:
513         if (rc && !*ea)
514                 obd_free_memmd(exp, &lsm);
515         RETURN(rc);
516 }
517
518 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
519                    obd_enqueue_update_f upcall, void *cookie,
520                    struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request   *req;
523         struct osc_setattr_args *sa;
524         struct ost_body         *body;
525         int                      rc;
526         ENTRY;
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529         if (req == NULL)
530                 RETURN(-ENOMEM);
531
532         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(rc);
537         }
538         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539         ptlrpc_at_set_req_timeout(req);
540
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
544         osc_pack_capa(req, body, oinfo->oi_capa);
545
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
550         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
551         sa = ptlrpc_req_async_args(req);
552         sa->sa_oa     = oinfo->oi_oa;
553         sa->sa_upcall = upcall;
554         sa->sa_cookie = cookie;
555         if (rqset == PTLRPCD_SET)
556                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
557         else
558                 ptlrpc_set_add_req(rqset, req);
559
560         RETURN(0);
561 }
562
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564                      struct obd_trans_info *oti,
565                      struct ptlrpc_request_set *rqset)
566 {
567         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
568         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570         return osc_punch_base(exp, oinfo,
571                               oinfo->oi_cb_up, oinfo, rqset);
572 }
573
574 static int osc_sync_interpret(const struct lu_env *env,
575                               struct ptlrpc_request *req,
576                               void *arg, int rc)
577 {
578         struct osc_async_args *aa = arg;
579         struct ost_body *body;
580         ENTRY;
581
582         if (rc)
583                 GOTO(out, rc);
584
585         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
586         if (body == NULL) {
587                 CERROR ("can't unpack ost_body\n");
588                 GOTO(out, rc = -EPROTO);
589         }
590
591         *aa->aa_oi->oi_oa = body->oa;
592 out:
593         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
594         RETURN(rc);
595 }
596
597 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
598                     obd_size start, obd_size end,
599                     struct ptlrpc_request_set *set)
600 {
601         struct ptlrpc_request *req;
602         struct ost_body       *body;
603         struct osc_async_args *aa;
604         int                    rc;
605         ENTRY;
606
607         if (!oinfo->oi_oa) {
608                 CDEBUG(D_INFO, "oa NULL\n");
609                 RETURN(-EINVAL);
610         }
611
612         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
613         if (req == NULL)
614                 RETURN(-ENOMEM);
615
616         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
617         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
618         if (rc) {
619                 ptlrpc_request_free(req);
620                 RETURN(rc);
621         }
622
623         /* overload the size and blocks fields in the oa with start/end */
624         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
625         LASSERT(body);
626         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
627         body->oa.o_size = start;
628         body->oa.o_blocks = end;
629         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
630         osc_pack_capa(req, body, oinfo->oi_capa);
631
632         ptlrpc_request_set_replen(req);
633         req->rq_interpret_reply = osc_sync_interpret;
634
635         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
636         aa = ptlrpc_req_async_args(req);
637         aa->aa_oi = oinfo;
638
639         ptlrpc_set_add_req(set, req);
640         RETURN (0);
641 }
642
643 /* Find and cancel locally locks matched by @mode in the resource found by
644  * @objid. Found locks are added into @cancel list. Returns the amount of
645  * locks added to @cancels list. */
646 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
647                                    cfs_list_t *cancels,
648                                    ldlm_mode_t mode, int lock_flags)
649 {
650         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
651         struct ldlm_res_id res_id;
652         struct ldlm_resource *res;
653         int count;
654         ENTRY;
655
656         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
657         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
658         if (res == NULL)
659                 RETURN(0);
660
661         LDLM_RESOURCE_ADDREF(res);
662         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
663                                            lock_flags, 0, NULL);
664         LDLM_RESOURCE_DELREF(res);
665         ldlm_resource_putref(res);
666         RETURN(count);
667 }
668
669 static int osc_destroy_interpret(const struct lu_env *env,
670                                  struct ptlrpc_request *req, void *data,
671                                  int rc)
672 {
673         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
674
675         cfs_atomic_dec(&cli->cl_destroy_in_flight);
676         cfs_waitq_signal(&cli->cl_destroy_waitq);
677         return 0;
678 }
679
680 static int osc_can_send_destroy(struct client_obd *cli)
681 {
682         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
683             cli->cl_max_rpcs_in_flight) {
684                 /* The destroy request can be sent */
685                 return 1;
686         }
687         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
688             cli->cl_max_rpcs_in_flight) {
689                 /*
690                  * The counter has been modified between the two atomic
691                  * operations.
692                  */
693                 cfs_waitq_signal(&cli->cl_destroy_waitq);
694         }
695         return 0;
696 }
697
698 /* Destroy requests can be async always on the client, and we don't even really
699  * care about the return code since the client cannot do anything at all about
700  * a destroy failure.
701  * When the MDS is unlinking a filename, it saves the file objects into a
702  * recovery llog, and these object records are cancelled when the OST reports
703  * they were destroyed and sync'd to disk (i.e. transaction committed).
704  * If the client dies, or the OST is down when the object should be destroyed,
705  * the records are not cancelled, and when the OST reconnects to the MDS next,
706  * it will retrieve the llog unlink logs and then sends the log cancellation
707  * cookies to the MDS after committing destroy transactions. */
708 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
709                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
710                        struct obd_export *md_export, void *capa)
711 {
712         struct client_obd     *cli = &exp->exp_obd->u.cli;
713         struct ptlrpc_request *req;
714         struct ost_body       *body;
715         CFS_LIST_HEAD(cancels);
716         int rc, count;
717         ENTRY;
718
719         if (!oa) {
720                 CDEBUG(D_INFO, "oa NULL\n");
721                 RETURN(-EINVAL);
722         }
723
724         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
725                                         LDLM_FL_DISCARD_DATA);
726
727         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
728         if (req == NULL) {
729                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
730                 RETURN(-ENOMEM);
731         }
732
733         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
734         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
735                                0, &cancels, count);
736         if (rc) {
737                 ptlrpc_request_free(req);
738                 RETURN(rc);
739         }
740
741         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
742         ptlrpc_at_set_req_timeout(req);
743
744         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
745                 oa->o_lcookie = *oti->oti_logcookies;
746         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
747         LASSERT(body);
748         lustre_set_wire_obdo(&body->oa, oa);
749
750         osc_pack_capa(req, body, (struct obd_capa *)capa);
751         ptlrpc_request_set_replen(req);
752
753         /* don't throttle destroy RPCs for the MDT */
754         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
755                 req->rq_interpret_reply = osc_destroy_interpret;
756                 if (!osc_can_send_destroy(cli)) {
757                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
758                                                           NULL);
759
760                         /*
761                          * Wait until the number of on-going destroy RPCs drops
762                          * under max_rpc_in_flight
763                          */
764                         l_wait_event_exclusive(cli->cl_destroy_waitq,
765                                                osc_can_send_destroy(cli), &lwi);
766                 }
767         }
768
769         /* Do not wait for response */
770         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
771         RETURN(0);
772 }
773
774 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
775                                 long writing_bytes)
776 {
777         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
778
779         LASSERT(!(oa->o_valid & bits));
780
781         oa->o_valid |= bits;
782         client_obd_list_lock(&cli->cl_loi_list_lock);
783         oa->o_dirty = cli->cl_dirty;
784         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
785                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
786                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
787                 oa->o_undirty = 0;
788         } else if (cfs_atomic_read(&obd_dirty_pages) -
789                    cfs_atomic_read(&obd_dirty_transit_pages) >
790                    obd_max_dirty_pages + 1){
791                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
792                  * not covered by a lock thus they may safely race and trip
793                  * this CERROR() unless we add in a small fudge factor (+1). */
794                 CERROR("dirty %d - %d > system dirty_max %d\n",
795                        cfs_atomic_read(&obd_dirty_pages),
796                        cfs_atomic_read(&obd_dirty_transit_pages),
797                        obd_max_dirty_pages);
798                 oa->o_undirty = 0;
799         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
800                 CERROR("dirty %lu - dirty_max %lu too big???\n",
801                        cli->cl_dirty, cli->cl_dirty_max);
802                 oa->o_undirty = 0;
803         } else {
804                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
805                                 (cli->cl_max_rpcs_in_flight + 1);
806                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
807         }
808         oa->o_grant = cli->cl_avail_grant;
809         oa->o_dropped = cli->cl_lost_grant;
810         cli->cl_lost_grant = 0;
811         client_obd_list_unlock(&cli->cl_loi_list_lock);
812         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
813                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
814
815 }
816
817 static void osc_update_next_shrink(struct client_obd *cli)
818 {
819         cli->cl_next_shrink_grant =
820                 cfs_time_shift(cli->cl_grant_shrink_interval);
821         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
822                cli->cl_next_shrink_grant);
823 }
824
825 /* caller must hold loi_list_lock */
826 static void osc_consume_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga)
828 {
829         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
830         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
831         cfs_atomic_inc(&obd_dirty_pages);
832         cli->cl_dirty += CFS_PAGE_SIZE;
833         cli->cl_avail_grant -= CFS_PAGE_SIZE;
834         pga->flag |= OBD_BRW_FROM_GRANT;
835         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
836                CFS_PAGE_SIZE, pga, pga->pg);
837         LASSERT(cli->cl_avail_grant >= 0);
838         osc_update_next_shrink(cli);
839 }
840
841 /* the companion to osc_consume_write_grant, called when a brw has completed.
842  * must be called with the loi lock held. */
843 static void osc_release_write_grant(struct client_obd *cli,
844                                     struct brw_page *pga, int sent)
845 {
846         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
847         ENTRY;
848
849         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
850         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
851                 EXIT;
852                 return;
853         }
854
855         pga->flag &= ~OBD_BRW_FROM_GRANT;
856         cfs_atomic_dec(&obd_dirty_pages);
857         cli->cl_dirty -= CFS_PAGE_SIZE;
858         if (pga->flag & OBD_BRW_NOCACHE) {
859                 pga->flag &= ~OBD_BRW_NOCACHE;
860                 cfs_atomic_dec(&obd_dirty_transit_pages);
861                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
862         }
863         if (!sent) {
864                 cli->cl_lost_grant += CFS_PAGE_SIZE;
865                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
866                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
867         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
868                 /* For short writes we shouldn't count parts of pages that
869                  * span a whole block on the OST side, or our accounting goes
870                  * wrong.  Should match the code in filter_grant_check. */
871                 int offset = pga->off & ~CFS_PAGE_MASK;
872                 int count = pga->count + (offset & (blocksize - 1));
873                 int end = (offset + pga->count) & (blocksize - 1);
874                 if (end)
875                         count += blocksize - end;
876
877                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
878                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
879                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
880                        cli->cl_avail_grant, cli->cl_dirty);
881         }
882
883         EXIT;
884 }
885
886 static unsigned long rpcs_in_flight(struct client_obd *cli)
887 {
888         return cli->cl_r_in_flight + cli->cl_w_in_flight;
889 }
890
891 /* caller must hold loi_list_lock */
892 void osc_wake_cache_waiters(struct client_obd *cli)
893 {
894         cfs_list_t *l, *tmp;
895         struct osc_cache_waiter *ocw;
896
897         ENTRY;
898         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
899                 /* if we can't dirty more, we must wait until some is written */
900                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
901                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
902                     obd_max_dirty_pages)) {
903                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
904                                "osc max %ld, sys max %d\n", cli->cl_dirty,
905                                cli->cl_dirty_max, obd_max_dirty_pages);
906                         return;
907                 }
908
909                 /* if still dirty cache but no grant wait for pending RPCs that
910                  * may yet return us some grant before doing sync writes */
911                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
912                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
913                                cli->cl_w_in_flight);
914                         return;
915                 }
916
917                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
918                 cfs_list_del_init(&ocw->ocw_entry);
919                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
920                         /* no more RPCs in flight to return grant, do sync IO */
921                         ocw->ocw_rc = -EDQUOT;
922                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
923                 } else {
924                         osc_consume_write_grant(cli,
925                                                 &ocw->ocw_oap->oap_brw_page);
926                 }
927
928                 cfs_waitq_signal(&ocw->ocw_waitq);
929         }
930
931         EXIT;
932 }
933
934 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
935 {
936         client_obd_list_lock(&cli->cl_loi_list_lock);
937         cli->cl_avail_grant += grant;
938         client_obd_list_unlock(&cli->cl_loi_list_lock);
939 }
940
941 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
942 {
943         if (body->oa.o_valid & OBD_MD_FLGRANT) {
944                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
945                 __osc_update_grant(cli, body->oa.o_grant);
946         }
947 }
948
949 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
950                               void *key, obd_count vallen, void *val,
951                               struct ptlrpc_request_set *set);
952
953 static int osc_shrink_grant_interpret(const struct lu_env *env,
954                                       struct ptlrpc_request *req,
955                                       void *aa, int rc)
956 {
957         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
958         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
959         struct ost_body *body;
960
961         if (rc != 0) {
962                 __osc_update_grant(cli, oa->o_grant);
963                 GOTO(out, rc);
964         }
965
966         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
967         LASSERT(body);
968         osc_update_grant(cli, body);
969 out:
970         OBDO_FREE(oa);
971         return rc;
972 }
973
974 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
975 {
976         client_obd_list_lock(&cli->cl_loi_list_lock);
977         oa->o_grant = cli->cl_avail_grant / 4;
978         cli->cl_avail_grant -= oa->o_grant;
979         client_obd_list_unlock(&cli->cl_loi_list_lock);
980         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
981                 oa->o_valid |= OBD_MD_FLFLAGS;
982                 oa->o_flags = 0;
983         }
984         oa->o_flags |= OBD_FL_SHRINK_GRANT;
985         osc_update_next_shrink(cli);
986 }
987
988 /* Shrink the current grant, either from some large amount to enough for a
989  * full set of in-flight RPCs, or if we have already shrunk to that limit
990  * then to enough for a single RPC.  This avoids keeping more grant than
991  * needed, and avoids shrinking the grant piecemeal. */
992 static int osc_shrink_grant(struct client_obd *cli)
993 {
994         long target = (cli->cl_max_rpcs_in_flight + 1) *
995                       cli->cl_max_pages_per_rpc;
996
997         client_obd_list_lock(&cli->cl_loi_list_lock);
998         if (cli->cl_avail_grant <= target)
999                 target = cli->cl_max_pages_per_rpc;
1000         client_obd_list_unlock(&cli->cl_loi_list_lock);
1001
1002         return osc_shrink_grant_to_target(cli, target);
1003 }
1004
1005 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1006 {
1007         int    rc = 0;
1008         struct ost_body     *body;
1009         ENTRY;
1010
1011         client_obd_list_lock(&cli->cl_loi_list_lock);
1012         /* Don't shrink if we are already above or below the desired limit
1013          * We don't want to shrink below a single RPC, as that will negatively
1014          * impact block allocation and long-term performance. */
1015         if (target < cli->cl_max_pages_per_rpc)
1016                 target = cli->cl_max_pages_per_rpc;
1017
1018         if (target >= cli->cl_avail_grant) {
1019                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1020                 RETURN(0);
1021         }
1022         client_obd_list_unlock(&cli->cl_loi_list_lock);
1023
1024         OBD_ALLOC_PTR(body);
1025         if (!body)
1026                 RETURN(-ENOMEM);
1027
1028         osc_announce_cached(cli, &body->oa, 0);
1029
1030         client_obd_list_lock(&cli->cl_loi_list_lock);
1031         body->oa.o_grant = cli->cl_avail_grant - target;
1032         cli->cl_avail_grant = target;
1033         client_obd_list_unlock(&cli->cl_loi_list_lock);
1034         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1035                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1036                 body->oa.o_flags = 0;
1037         }
1038         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1039         osc_update_next_shrink(cli);
1040
1041         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1042                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1043                                 sizeof(*body), body, NULL);
1044         if (rc != 0)
1045                 __osc_update_grant(cli, body->oa.o_grant);
1046         OBD_FREE_PTR(body);
1047         RETURN(rc);
1048 }
1049
1050 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1051 static int osc_should_shrink_grant(struct client_obd *client)
1052 {
1053         cfs_time_t time = cfs_time_current();
1054         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1055
1056         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1057              OBD_CONNECT_GRANT_SHRINK) == 0)
1058                 return 0;
1059
1060         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1061                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1062                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1063                         return 1;
1064                 else
1065                         osc_update_next_shrink(client);
1066         }
1067         return 0;
1068 }
1069
1070 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1071 {
1072         struct client_obd *client;
1073
1074         cfs_list_for_each_entry(client, &item->ti_obd_list,
1075                                 cl_grant_shrink_list) {
1076                 if (osc_should_shrink_grant(client))
1077                         osc_shrink_grant(client);
1078         }
1079         return 0;
1080 }
1081
1082 static int osc_add_shrink_grant(struct client_obd *client)
1083 {
1084         int rc;
1085
1086         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1087                                        TIMEOUT_GRANT,
1088                                        osc_grant_shrink_grant_cb, NULL,
1089                                        &client->cl_grant_shrink_list);
1090         if (rc) {
1091                 CERROR("add grant client %s error %d\n",
1092                         client->cl_import->imp_obd->obd_name, rc);
1093                 return rc;
1094         }
1095         CDEBUG(D_CACHE, "add grant client %s \n",
1096                client->cl_import->imp_obd->obd_name);
1097         osc_update_next_shrink(client);
1098         return 0;
1099 }
1100
1101 static int osc_del_shrink_grant(struct client_obd *client)
1102 {
1103         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1104                                          TIMEOUT_GRANT);
1105 }
1106
1107 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1108 {
1109         /*
1110          * ocd_grant is the total grant amount we're expect to hold: if we've
1111          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1112          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1113          *
1114          * race is tolerable here: if we're evicted, but imp_state already
1115          * left EVICTED state, then cl_dirty must be 0 already.
1116          */
1117         client_obd_list_lock(&cli->cl_loi_list_lock);
1118         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1119                 cli->cl_avail_grant = ocd->ocd_grant;
1120         else
1121                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1122
1123         if (cli->cl_avail_grant < 0) {
1124                 CWARN("%s: available grant < 0, the OSS is probably not running"
1125                       " with patch from bug20278 (%ld) \n",
1126                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1127                 /* workaround for 1.6 servers which do not have
1128                  * the patch from bug20278 */
1129                 cli->cl_avail_grant = ocd->ocd_grant;
1130         }
1131
1132         client_obd_list_unlock(&cli->cl_loi_list_lock);
1133
1134         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1135                cli->cl_import->imp_obd->obd_name,
1136                cli->cl_avail_grant, cli->cl_lost_grant);
1137
1138         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1139             cfs_list_empty(&cli->cl_grant_shrink_list))
1140                 osc_add_shrink_grant(cli);
1141 }
1142
1143 /* We assume that the reason this OSC got a short read is because it read
1144  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1145  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1146  * this stripe never got written at or beyond this stripe offset yet. */
1147 static void handle_short_read(int nob_read, obd_count page_count,
1148                               struct brw_page **pga)
1149 {
1150         char *ptr;
1151         int i = 0;
1152
1153         /* skip bytes read OK */
1154         while (nob_read > 0) {
1155                 LASSERT (page_count > 0);
1156
1157                 if (pga[i]->count > nob_read) {
1158                         /* EOF inside this page */
1159                         ptr = cfs_kmap(pga[i]->pg) +
1160                                 (pga[i]->off & ~CFS_PAGE_MASK);
1161                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1162                         cfs_kunmap(pga[i]->pg);
1163                         page_count--;
1164                         i++;
1165                         break;
1166                 }
1167
1168                 nob_read -= pga[i]->count;
1169                 page_count--;
1170                 i++;
1171         }
1172
1173         /* zero remaining pages */
1174         while (page_count-- > 0) {
1175                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1176                 memset(ptr, 0, pga[i]->count);
1177                 cfs_kunmap(pga[i]->pg);
1178                 i++;
1179         }
1180 }
1181
1182 static int check_write_rcs(struct ptlrpc_request *req,
1183                            int requested_nob, int niocount,
1184                            obd_count page_count, struct brw_page **pga)
1185 {
1186         int     i;
1187         __u32   *remote_rcs;
1188
1189         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1190                                                   sizeof(*remote_rcs) *
1191                                                   niocount);
1192         if (remote_rcs == NULL) {
1193                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1194                 return(-EPROTO);
1195         }
1196
1197         /* return error if any niobuf was in error */
1198         for (i = 0; i < niocount; i++) {
1199                 if ((int)remote_rcs[i] < 0)
1200                         return(remote_rcs[i]);
1201
1202                 if (remote_rcs[i] != 0) {
1203                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1204                                 i, remote_rcs[i], req);
1205                         return(-EPROTO);
1206                 }
1207         }
1208
1209         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1210                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1211                        req->rq_bulk->bd_nob_transferred, requested_nob);
1212                 return(-EPROTO);
1213         }
1214
1215         return (0);
1216 }
1217
1218 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1219 {
1220         if (p1->flag != p2->flag) {
1221                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1222                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1223
1224                 /* warn if we try to combine flags that we don't know to be
1225                  * safe to combine */
1226                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1227                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1228                               "report this at http://bugs.whamcloud.com/\n",
1229                               p1->flag, p2->flag);
1230                 }
1231                 return 0;
1232         }
1233
1234         return (p1->off + p1->count == p2->off);
1235 }
1236
1237 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1238                                    struct brw_page **pga, int opc,
1239                                    cksum_type_t cksum_type)
1240 {
1241         __u32 cksum;
1242         int i = 0;
1243
1244         LASSERT (pg_count > 0);
1245         cksum = init_checksum(cksum_type);
1246         while (nob > 0 && pg_count > 0) {
1247                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1248                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1249                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1250
1251                 /* corrupt the data before we compute the checksum, to
1252                  * simulate an OST->client data error */
1253                 if (i == 0 && opc == OST_READ &&
1254                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1255                         memcpy(ptr + off, "bad1", min(4, nob));
1256                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1257                 cfs_kunmap(pga[i]->pg);
1258                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1259                                off, cksum);
1260
1261                 nob -= pga[i]->count;
1262                 pg_count--;
1263                 i++;
1264         }
1265         /* For sending we only compute the wrong checksum instead
1266          * of corrupting the data so it is still correct on a redo */
1267         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1268                 cksum++;
1269
1270         return cksum;
1271 }
1272
1273 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1274                                 struct lov_stripe_md *lsm, obd_count page_count,
1275                                 struct brw_page **pga,
1276                                 struct ptlrpc_request **reqp,
1277                                 struct obd_capa *ocapa, int reserve,
1278                                 int resend)
1279 {
1280         struct ptlrpc_request   *req;
1281         struct ptlrpc_bulk_desc *desc;
1282         struct ost_body         *body;
1283         struct obd_ioobj        *ioobj;
1284         struct niobuf_remote    *niobuf;
1285         int niocount, i, requested_nob, opc, rc;
1286         struct osc_brw_async_args *aa;
1287         struct req_capsule      *pill;
1288         struct brw_page *pg_prev;
1289
1290         ENTRY;
1291         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1292                 RETURN(-ENOMEM); /* Recoverable */
1293         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1294                 RETURN(-EINVAL); /* Fatal */
1295
1296         if ((cmd & OBD_BRW_WRITE) != 0) {
1297                 opc = OST_WRITE;
1298                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1299                                                 cli->cl_import->imp_rq_pool,
1300                                                 &RQF_OST_BRW_WRITE);
1301         } else {
1302                 opc = OST_READ;
1303                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1304         }
1305         if (req == NULL)
1306                 RETURN(-ENOMEM);
1307
1308         for (niocount = i = 1; i < page_count; i++) {
1309                 if (!can_merge_pages(pga[i - 1], pga[i]))
1310                         niocount++;
1311         }
1312
1313         pill = &req->rq_pill;
1314         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1315                              sizeof(*ioobj));
1316         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1317                              niocount * sizeof(*niobuf));
1318         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1319
1320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1321         if (rc) {
1322                 ptlrpc_request_free(req);
1323                 RETURN(rc);
1324         }
1325         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1326         ptlrpc_at_set_req_timeout(req);
1327
1328         if (opc == OST_WRITE)
1329                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1330                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1331         else
1332                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1333                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1334
1335         if (desc == NULL)
1336                 GOTO(out, rc = -ENOMEM);
1337         /* NB request now owns desc and will free it when it gets freed */
1338
1339         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1340         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1341         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1342         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1343
1344         lustre_set_wire_obdo(&body->oa, oa);
1345
1346         obdo_to_ioobj(oa, ioobj);
1347         ioobj->ioo_bufcnt = niocount;
1348         osc_pack_capa(req, body, ocapa);
1349         LASSERT (page_count > 0);
1350         pg_prev = pga[0];
1351         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1352                 struct brw_page *pg = pga[i];
1353                 int poff = pg->off & ~CFS_PAGE_MASK;
1354
1355                 LASSERT(pg->count > 0);
1356                 /* make sure there is no gap in the middle of page array */
1357                 LASSERTF(page_count == 1 ||
1358                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1359                           ergo(i > 0 && i < page_count - 1,
1360                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1361                           ergo(i == page_count - 1, poff == 0)),
1362                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1363                          i, page_count, pg, pg->off, pg->count);
1364 #ifdef __linux__
1365                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1366                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1367                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1368                          i, page_count,
1369                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1370                          pg_prev->pg, page_private(pg_prev->pg),
1371                          pg_prev->pg->index, pg_prev->off);
1372 #else
1373                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1374                          "i %d p_c %u\n", i, page_count);
1375 #endif
1376                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1377                         (pg->flag & OBD_BRW_SRVLOCK));
1378
1379                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1380                 requested_nob += pg->count;
1381
1382                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1383                         niobuf--;
1384                         niobuf->len += pg->count;
1385                 } else {
1386                         niobuf->offset = pg->off;
1387                         niobuf->len    = pg->count;
1388                         niobuf->flags  = pg->flag;
1389                 }
1390                 pg_prev = pg;
1391         }
1392
1393         LASSERTF((void *)(niobuf - niocount) ==
1394                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1395                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1396                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1397
1398         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1399         if (resend) {
1400                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1402                         body->oa.o_flags = 0;
1403                 }
1404                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1405         }
1406
1407         if (osc_should_shrink_grant(cli))
1408                 osc_shrink_grant_local(cli, &body->oa);
1409
1410         /* size[REQ_REC_OFF] still sizeof (*body) */
1411         if (opc == OST_WRITE) {
1412                 if (unlikely(cli->cl_checksum) &&
1413                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1414                         /* store cl_cksum_type in a local variable since
1415                          * it can be changed via lprocfs */
1416                         cksum_type_t cksum_type = cli->cl_cksum_type;
1417
1418                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1419                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1420                                 body->oa.o_flags = 0;
1421                         }
1422                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1423                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1425                                                              page_count, pga,
1426                                                              OST_WRITE,
1427                                                              cksum_type);
1428                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1429                                body->oa.o_cksum);
1430                         /* save this in 'oa', too, for later checking */
1431                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1432                         oa->o_flags |= cksum_type_pack(cksum_type);
1433                 } else {
1434                         /* clear out the checksum flag, in case this is a
1435                          * resend but cl_checksum is no longer set. b=11238 */
1436                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1437                 }
1438                 oa->o_cksum = body->oa.o_cksum;
1439                 /* 1 RC per niobuf */
1440                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1441                                      sizeof(__u32) * niocount);
1442         } else {
1443                 if (unlikely(cli->cl_checksum) &&
1444                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1445                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1446                                 body->oa.o_flags = 0;
1447                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1448                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1449                 }
1450         }
1451         ptlrpc_request_set_replen(req);
1452
1453         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1454         aa = ptlrpc_req_async_args(req);
1455         aa->aa_oa = oa;
1456         aa->aa_requested_nob = requested_nob;
1457         aa->aa_nio_count = niocount;
1458         aa->aa_page_count = page_count;
1459         aa->aa_resends = 0;
1460         aa->aa_ppga = pga;
1461         aa->aa_cli = cli;
1462         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1463         if (ocapa && reserve)
1464                 aa->aa_ocapa = capa_get(ocapa);
1465
1466         *reqp = req;
1467         RETURN(0);
1468
1469  out:
1470         ptlrpc_req_finished(req);
1471         RETURN(rc);
1472 }
1473
1474 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1475                                 __u32 client_cksum, __u32 server_cksum, int nob,
1476                                 obd_count page_count, struct brw_page **pga,
1477                                 cksum_type_t client_cksum_type)
1478 {
1479         __u32 new_cksum;
1480         char *msg;
1481         cksum_type_t cksum_type;
1482
1483         if (server_cksum == client_cksum) {
1484                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1485                 return 0;
1486         }
1487
1488         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1489                                        oa->o_flags : 0);
1490         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1491                                       cksum_type);
1492
1493         if (cksum_type != client_cksum_type)
1494                 msg = "the server did not use the checksum type specified in "
1495                       "the original request - likely a protocol problem";
1496         else if (new_cksum == server_cksum)
1497                 msg = "changed on the client after we checksummed it - "
1498                       "likely false positive due to mmap IO (bug 11742)";
1499         else if (new_cksum == client_cksum)
1500                 msg = "changed in transit before arrival at OST";
1501         else
1502                 msg = "changed in transit AND doesn't match the original - "
1503                       "likely false positive due to mmap IO (bug 11742)";
1504
1505         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1506                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1507                            msg, libcfs_nid2str(peer->nid),
1508                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1509                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1510                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1511                            oa->o_id,
1512                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1513                            pga[0]->off,
1514                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1515         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1516                "client csum now %x\n", client_cksum, client_cksum_type,
1517                server_cksum, cksum_type, new_cksum);
1518         return 1;
1519 }
1520
1521 /* Note rc enters this function as number of bytes transferred */
1522 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1523 {
1524         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1525         const lnet_process_id_t *peer =
1526                         &req->rq_import->imp_connection->c_peer;
1527         struct client_obd *cli = aa->aa_cli;
1528         struct ost_body *body;
1529         __u32 client_cksum = 0;
1530         ENTRY;
1531
1532         if (rc < 0 && rc != -EDQUOT) {
1533                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1534                 RETURN(rc);
1535         }
1536
1537         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1538         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1539         if (body == NULL) {
1540                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1541                 RETURN(-EPROTO);
1542         }
1543
1544         /* set/clear over quota flag for a uid/gid */
1545         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1546             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1547                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1548
1549                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1550                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1551                        body->oa.o_flags);
1552                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1553         }
1554
1555         osc_update_grant(cli, body);
1556
1557         if (rc < 0)
1558                 RETURN(rc);
1559
1560         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1561                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1562
1563         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1564                 if (rc > 0) {
1565                         CERROR("Unexpected +ve rc %d\n", rc);
1566                         RETURN(-EPROTO);
1567                 }
1568                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1569
1570                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1571                         RETURN(-EAGAIN);
1572
1573                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1574                     check_write_checksum(&body->oa, peer, client_cksum,
1575                                          body->oa.o_cksum, aa->aa_requested_nob,
1576                                          aa->aa_page_count, aa->aa_ppga,
1577                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1578                         RETURN(-EAGAIN);
1579
1580                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1581                                      aa->aa_page_count, aa->aa_ppga);
1582                 GOTO(out, rc);
1583         }
1584
1585         /* The rest of this function executes only for OST_READs */
1586
1587         /* if unwrap_bulk failed, return -EAGAIN to retry */
1588         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1589         if (rc < 0)
1590                 GOTO(out, rc = -EAGAIN);
1591
1592         if (rc > aa->aa_requested_nob) {
1593                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1594                        aa->aa_requested_nob);
1595                 RETURN(-EPROTO);
1596         }
1597
1598         if (rc != req->rq_bulk->bd_nob_transferred) {
1599                 CERROR ("Unexpected rc %d (%d transferred)\n",
1600                         rc, req->rq_bulk->bd_nob_transferred);
1601                 return (-EPROTO);
1602         }
1603
1604         if (rc < aa->aa_requested_nob)
1605                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1606
1607         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1608                 static int cksum_counter;
1609                 __u32      server_cksum = body->oa.o_cksum;
1610                 char      *via;
1611                 char      *router;
1612                 cksum_type_t cksum_type;
1613
1614                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1615                                                body->oa.o_flags : 0);
1616                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1617                                                  aa->aa_ppga, OST_READ,
1618                                                  cksum_type);
1619
1620                 if (peer->nid == req->rq_bulk->bd_sender) {
1621                         via = router = "";
1622                 } else {
1623                         via = " via ";
1624                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1625                 }
1626
1627                 if (server_cksum == ~0 && rc > 0) {
1628                         CERROR("Protocol error: server %s set the 'checksum' "
1629                                "bit, but didn't send a checksum.  Not fatal, "
1630                                "but please notify on http://bugs.whamcloud.com/\n",
1631                                libcfs_nid2str(peer->nid));
1632                 } else if (server_cksum != client_cksum) {
1633                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1634                                            "%s%s%s inode "DFID" object "
1635                                            LPU64"/"LPU64" extent "
1636                                            "["LPU64"-"LPU64"]\n",
1637                                            req->rq_import->imp_obd->obd_name,
1638                                            libcfs_nid2str(peer->nid),
1639                                            via, router,
1640                                            body->oa.o_valid & OBD_MD_FLFID ?
1641                                                 body->oa.o_parent_seq : (__u64)0,
1642                                            body->oa.o_valid & OBD_MD_FLFID ?
1643                                                 body->oa.o_parent_oid : 0,
1644                                            body->oa.o_valid & OBD_MD_FLFID ?
1645                                                 body->oa.o_parent_ver : 0,
1646                                            body->oa.o_id,
1647                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1648                                                 body->oa.o_seq : (__u64)0,
1649                                            aa->aa_ppga[0]->off,
1650                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1651                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1652                                                                         1);
1653                         CERROR("client %x, server %x, cksum_type %x\n",
1654                                client_cksum, server_cksum, cksum_type);
1655                         cksum_counter = 0;
1656                         aa->aa_oa->o_cksum = client_cksum;
1657                         rc = -EAGAIN;
1658                 } else {
1659                         cksum_counter++;
1660                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1661                         rc = 0;
1662                 }
1663         } else if (unlikely(client_cksum)) {
1664                 static int cksum_missed;
1665
1666                 cksum_missed++;
1667                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1668                         CERROR("Checksum %u requested from %s but not sent\n",
1669                                cksum_missed, libcfs_nid2str(peer->nid));
1670         } else {
1671                 rc = 0;
1672         }
1673 out:
1674         if (rc >= 0)
1675                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1676
1677         RETURN(rc);
1678 }
1679
1680 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1681                             struct lov_stripe_md *lsm,
1682                             obd_count page_count, struct brw_page **pga,
1683                             struct obd_capa *ocapa)
1684 {
1685         struct ptlrpc_request *req;
1686         int                    rc;
1687         cfs_waitq_t            waitq;
1688         int                    resends = 0;
1689         struct l_wait_info     lwi;
1690
1691         ENTRY;
1692
1693         cfs_waitq_init(&waitq);
1694
1695 restart_bulk:
1696         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1697                                   page_count, pga, &req, ocapa, 0, resends);
1698         if (rc != 0)
1699                 return (rc);
1700
1701         rc = ptlrpc_queue_wait(req);
1702
1703         if (rc == -ETIMEDOUT && req->rq_resend) {
1704                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1705                 ptlrpc_req_finished(req);
1706                 goto restart_bulk;
1707         }
1708
1709         rc = osc_brw_fini_request(req, rc);
1710
1711         ptlrpc_req_finished(req);
1712         if (osc_recoverable_error(rc)) {
1713                 resends++;
1714                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1715                         CERROR("too many resend retries, returning error\n");
1716                         RETURN(-EIO);
1717                 }
1718
1719                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1720                 l_wait_event(waitq, 0, &lwi);
1721
1722                 goto restart_bulk;
1723         }
1724
1725         RETURN (rc);
1726 }
1727
1728 int osc_brw_redo_request(struct ptlrpc_request *request,
1729                          struct osc_brw_async_args *aa)
1730 {
1731         struct ptlrpc_request *new_req;
1732         struct ptlrpc_request_set *set = request->rq_set;
1733         struct osc_brw_async_args *new_aa;
1734         struct osc_async_page *oap;
1735         int rc = 0;
1736         ENTRY;
1737
1738         if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1739                 CERROR("too many resent retries, returning error\n");
1740                 RETURN(-EIO);
1741         }
1742
1743         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1744
1745         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1746                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1747                                   aa->aa_cli, aa->aa_oa,
1748                                   NULL /* lsm unused by osc currently */,
1749                                   aa->aa_page_count, aa->aa_ppga,
1750                                   &new_req, aa->aa_ocapa, 0, 1);
1751         if (rc)
1752                 RETURN(rc);
1753
1754         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1755
1756         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1757                 if (oap->oap_request != NULL) {
1758                         LASSERTF(request == oap->oap_request,
1759                                  "request %p != oap_request %p\n",
1760                                  request, oap->oap_request);
1761                         if (oap->oap_interrupted) {
1762                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1763                                 ptlrpc_req_finished(new_req);
1764                                 RETURN(-EINTR);
1765                         }
1766                 }
1767         }
1768         /* New request takes over pga and oaps from old request.
1769          * Note that copying a list_head doesn't work, need to move it... */
1770         aa->aa_resends++;
1771         new_req->rq_interpret_reply = request->rq_interpret_reply;
1772         new_req->rq_async_args = request->rq_async_args;
1773         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1774
1775         new_aa = ptlrpc_req_async_args(new_req);
1776
1777         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1779         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1780
1781         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1782                 if (oap->oap_request) {
1783                         ptlrpc_req_finished(oap->oap_request);
1784                         oap->oap_request = ptlrpc_request_addref(new_req);
1785                 }
1786         }
1787
1788         new_aa->aa_ocapa = aa->aa_ocapa;
1789         aa->aa_ocapa = NULL;
1790
1791         /* use ptlrpc_set_add_req is safe because interpret functions work
1792          * in check_set context. only one way exist with access to request
1793          * from different thread got -EINTR - this way protected with
1794          * cl_loi_list_lock */
1795         ptlrpc_set_add_req(set, new_req);
1796
1797         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1798
1799         DEBUG_REQ(D_INFO, new_req, "new request");
1800         RETURN(0);
1801 }
1802
1803 /*
1804  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1805  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806  * fine for our small page arrays and doesn't require allocation.  its an
1807  * insertion sort that swaps elements that are strides apart, shrinking the
1808  * stride down until its '1' and the array is sorted.
1809  */
1810 static void sort_brw_pages(struct brw_page **array, int num)
1811 {
1812         int stride, i, j;
1813         struct brw_page *tmp;
1814
1815         if (num == 1)
1816                 return;
1817         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1818                 ;
1819
1820         do {
1821                 stride /= 3;
1822                 for (i = stride ; i < num ; i++) {
1823                         tmp = array[i];
1824                         j = i;
1825                         while (j >= stride && array[j - stride]->off > tmp->off) {
1826                                 array[j] = array[j - stride];
1827                                 j -= stride;
1828                         }
1829                         array[j] = tmp;
1830                 }
1831         } while (stride > 1);
1832 }
1833
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1835 {
1836         int count = 1;
1837         int offset;
1838         int i = 0;
1839
1840         LASSERT (pages > 0);
1841         offset = pg[i]->off & ~CFS_PAGE_MASK;
1842
1843         for (;;) {
1844                 pages--;
1845                 if (pages == 0)         /* that's all */
1846                         return count;
1847
1848                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1849                         return count;   /* doesn't end on page boundary */
1850
1851                 i++;
1852                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853                 if (offset != 0)        /* doesn't start on page boundary */
1854                         return count;
1855
1856                 count++;
1857         }
1858 }
1859
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1861 {
1862         struct brw_page **ppga;
1863         int i;
1864
1865         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1866         if (ppga == NULL)
1867                 return NULL;
1868
1869         for (i = 0; i < count; i++)
1870                 ppga[i] = pga + i;
1871         return ppga;
1872 }
1873
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1875 {
1876         LASSERT(ppga != NULL);
1877         OBD_FREE(ppga, sizeof(*ppga) * count);
1878 }
1879
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881                    obd_count page_count, struct brw_page *pga,
1882                    struct obd_trans_info *oti)
1883 {
1884         struct obdo *saved_oa = NULL;
1885         struct brw_page **ppga, **orig;
1886         struct obd_import *imp = class_exp2cliimp(exp);
1887         struct client_obd *cli;
1888         int rc, page_count_orig;
1889         ENTRY;
1890
1891         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892         cli = &imp->imp_obd->u.cli;
1893
1894         if (cmd & OBD_BRW_CHECK) {
1895                 /* The caller just wants to know if there's a chance that this
1896                  * I/O can succeed */
1897
1898                 if (imp->imp_invalid)
1899                         RETURN(-EIO);
1900                 RETURN(0);
1901         }
1902
1903         /* test_brw with a failed create can trip this, maybe others. */
1904         LASSERT(cli->cl_max_pages_per_rpc);
1905
1906         rc = 0;
1907
1908         orig = ppga = osc_build_ppga(pga, page_count);
1909         if (ppga == NULL)
1910                 RETURN(-ENOMEM);
1911         page_count_orig = page_count;
1912
1913         sort_brw_pages(ppga, page_count);
1914         while (page_count) {
1915                 obd_count pages_per_brw;
1916
1917                 if (page_count > cli->cl_max_pages_per_rpc)
1918                         pages_per_brw = cli->cl_max_pages_per_rpc;
1919                 else
1920                         pages_per_brw = page_count;
1921
1922                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1923
1924                 if (saved_oa != NULL) {
1925                         /* restore previously saved oa */
1926                         *oinfo->oi_oa = *saved_oa;
1927                 } else if (page_count > pages_per_brw) {
1928                         /* save a copy of oa (brw will clobber it) */
1929                         OBDO_ALLOC(saved_oa);
1930                         if (saved_oa == NULL)
1931                                 GOTO(out, rc = -ENOMEM);
1932                         *saved_oa = *oinfo->oi_oa;
1933                 }
1934
1935                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936                                       pages_per_brw, ppga, oinfo->oi_capa);
1937
1938                 if (rc != 0)
1939                         break;
1940
1941                 page_count -= pages_per_brw;
1942                 ppga += pages_per_brw;
1943         }
1944
1945 out:
1946         osc_release_ppga(orig, page_count_orig);
1947
1948         if (saved_oa != NULL)
1949                 OBDO_FREE(saved_oa);
1950
1951         RETURN(rc);
1952 }
1953
1954 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1955  * the dirty accounting.  Writeback completes or truncate happens before
1956  * writing starts.  Must be called with the loi lock held. */
1957 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1958                            int sent)
1959 {
1960         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1961 }
1962
1963
1964 /* This maintains the lists of pending pages to read/write for a given object
1965  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1966  * to quickly find objects that are ready to send an RPC. */
1967 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1968                          int cmd)
1969 {
1970         ENTRY;
1971
1972         if (lop->lop_num_pending == 0)
1973                 RETURN(0);
1974
1975         /* if we have an invalid import we want to drain the queued pages
1976          * by forcing them through rpcs that immediately fail and complete
1977          * the pages.  recovery relies on this to empty the queued pages
1978          * before canceling the locks and evicting down the llite pages */
1979         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1980                 RETURN(1);
1981
1982         /* stream rpcs in queue order as long as as there is an urgent page
1983          * queued.  this is our cheap solution for good batching in the case
1984          * where writepage marks some random page in the middle of the file
1985          * as urgent because of, say, memory pressure */
1986         if (!cfs_list_empty(&lop->lop_urgent)) {
1987                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1988                 RETURN(1);
1989         }
1990
1991         if (cmd & OBD_BRW_WRITE) {
1992                 /* trigger a write rpc stream as long as there are dirtiers
1993                  * waiting for space.  as they're waiting, they're not going to
1994                  * create more pages to coalesce with what's waiting.. */
1995                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1997                         RETURN(1);
1998                 }
1999         }
2000         if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2001                 RETURN(1);
2002
2003         RETURN(0);
2004 }
2005
2006 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2007 {
2008         struct osc_async_page *oap;
2009         ENTRY;
2010
2011         if (cfs_list_empty(&lop->lop_urgent))
2012                 RETURN(0);
2013
2014         oap = cfs_list_entry(lop->lop_urgent.next,
2015                          struct osc_async_page, oap_urgent_item);
2016
2017         if (oap->oap_async_flags & ASYNC_HP) {
2018                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2019                 RETURN(1);
2020         }
2021
2022         RETURN(0);
2023 }
2024
2025 static void on_list(cfs_list_t *item, cfs_list_t *list,
2026                     int should_be_on)
2027 {
2028         if (cfs_list_empty(item) && should_be_on)
2029                 cfs_list_add_tail(item, list);
2030         else if (!cfs_list_empty(item) && !should_be_on)
2031                 cfs_list_del_init(item);
2032 }
2033
2034 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2035  * can find pages to build into rpcs quickly */
2036 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2037 {
2038         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2039             lop_makes_hprpc(&loi->loi_read_lop)) {
2040                 /* HP rpc */
2041                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2042                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2043         } else {
2044                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2045                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2046                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2047                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2048         }
2049
2050         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2051                 loi->loi_write_lop.lop_num_pending);
2052
2053         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2054                 loi->loi_read_lop.lop_num_pending);
2055 }
2056
2057 static void lop_update_pending(struct client_obd *cli,
2058                                struct loi_oap_pages *lop, int cmd, int delta)
2059 {
2060         lop->lop_num_pending += delta;
2061         if (cmd & OBD_BRW_WRITE)
2062                 cli->cl_pending_w_pages += delta;
2063         else
2064                 cli->cl_pending_r_pages += delta;
2065 }
2066
2067 /**
2068  * this is called when a sync waiter receives an interruption.  Its job is to
2069  * get the caller woken as soon as possible.  If its page hasn't been put in an
2070  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2071  * desiring interruption which will forcefully complete the rpc once the rpc
2072  * has timed out.
2073  */
2074 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2075 {
2076         struct loi_oap_pages *lop;
2077         struct lov_oinfo *loi;
2078         int rc = -EBUSY;
2079         ENTRY;
2080
2081         LASSERT(!oap->oap_interrupted);
2082         oap->oap_interrupted = 1;
2083
2084         /* ok, it's been put in an rpc. only one oap gets a request reference */
2085         if (oap->oap_request != NULL) {
2086                 ptlrpc_mark_interrupted(oap->oap_request);
2087                 ptlrpcd_wake(oap->oap_request);
2088                 ptlrpc_req_finished(oap->oap_request);
2089                 oap->oap_request = NULL;
2090         }
2091
2092         /*
2093          * page completion may be called only if ->cpo_prep() method was
2094          * executed by osc_io_submit(), that also adds page the to pending list
2095          */
2096         if (!cfs_list_empty(&oap->oap_pending_item)) {
2097                 cfs_list_del_init(&oap->oap_pending_item);
2098                 cfs_list_del_init(&oap->oap_urgent_item);
2099
2100                 loi = oap->oap_loi;
2101                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2102                         &loi->loi_write_lop : &loi->loi_read_lop;
2103                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2104                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2105                 rc = oap->oap_caller_ops->ap_completion(env,
2106                                           oap->oap_caller_data,
2107                                           oap->oap_cmd, NULL, -EINTR);
2108         }
2109
2110         RETURN(rc);
2111 }
2112
2113 /* this is trying to propogate async writeback errors back up to the
2114  * application.  As an async write fails we record the error code for later if
2115  * the app does an fsync.  As long as errors persist we force future rpcs to be
2116  * sync so that the app can get a sync error and break the cycle of queueing
2117  * pages for which writeback will fail. */
2118 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2119                            int rc)
2120 {
2121         if (rc) {
2122                 if (!ar->ar_rc)
2123                         ar->ar_rc = rc;
2124
2125                 ar->ar_force_sync = 1;
2126                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2127                 return;
2128
2129         }
2130
2131         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2132                 ar->ar_force_sync = 0;
2133 }
2134
2135 void osc_oap_to_pending(struct osc_async_page *oap)
2136 {
2137         struct loi_oap_pages *lop;
2138
2139         if (oap->oap_cmd & OBD_BRW_WRITE)
2140                 lop = &oap->oap_loi->loi_write_lop;
2141         else
2142                 lop = &oap->oap_loi->loi_read_lop;
2143
2144         if (oap->oap_async_flags & ASYNC_HP)
2145                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2146         else if (oap->oap_async_flags & ASYNC_URGENT)
2147                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2148         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2149         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2150 }
2151
2152 /* this must be called holding the loi list lock to give coverage to exit_cache,
2153  * async_flag maintenance, and oap_request */
2154 static void osc_ap_completion(const struct lu_env *env,
2155                               struct client_obd *cli, struct obdo *oa,
2156                               struct osc_async_page *oap, int sent, int rc)
2157 {
2158         __u64 xid = 0;
2159
2160         ENTRY;
2161         if (oap->oap_request != NULL) {
2162                 xid = ptlrpc_req_xid(oap->oap_request);
2163                 ptlrpc_req_finished(oap->oap_request);
2164                 oap->oap_request = NULL;
2165         }
2166
2167         cfs_spin_lock(&oap->oap_lock);
2168         oap->oap_async_flags = 0;
2169         cfs_spin_unlock(&oap->oap_lock);
2170         oap->oap_interrupted = 0;
2171
2172         if (oap->oap_cmd & OBD_BRW_WRITE) {
2173                 osc_process_ar(&cli->cl_ar, xid, rc);
2174                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2175         }
2176
2177         if (rc == 0 && oa != NULL) {
2178                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2179                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2180                 if (oa->o_valid & OBD_MD_FLMTIME)
2181                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2182                 if (oa->o_valid & OBD_MD_FLATIME)
2183                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2184                 if (oa->o_valid & OBD_MD_FLCTIME)
2185                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2186         }
2187
2188         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2189                                                 oap->oap_cmd, oa, rc);
2190
2191         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2192          * start, but OSC calls it under lock and thus we can add oap back to
2193          * pending safely */
2194         if (rc)
2195                 /* upper layer wants to leave the page on pending queue */
2196                 osc_oap_to_pending(oap);
2197         else
2198                 osc_exit_cache(cli, oap, sent);
2199         EXIT;
2200 }
2201
2202 static int brw_queue_work(const struct lu_env *env, void *data)
2203 {
2204         struct client_obd *cli = data;
2205
2206         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2207
2208         client_obd_list_lock(&cli->cl_loi_list_lock);
2209         osc_check_rpcs0(env, cli, 1);
2210         client_obd_list_unlock(&cli->cl_loi_list_lock);
2211         RETURN(0);
2212 }
2213
2214 static int brw_interpret(const struct lu_env *env,
2215                          struct ptlrpc_request *req, void *data, int rc)
2216 {
2217         struct osc_brw_async_args *aa = data;
2218         struct client_obd *cli;
2219         int async;
2220         ENTRY;
2221
2222         rc = osc_brw_fini_request(req, rc);
2223         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2224         if (osc_recoverable_error(rc)) {
2225                 rc = osc_brw_redo_request(req, aa);
2226                 if (rc == 0)
2227                         RETURN(0);
2228         }
2229
2230         if (aa->aa_ocapa) {
2231                 capa_put(aa->aa_ocapa);
2232                 aa->aa_ocapa = NULL;
2233         }
2234
2235         cli = aa->aa_cli;
2236         client_obd_list_lock(&cli->cl_loi_list_lock);
2237
2238         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2239          * is called so we know whether to go to sync BRWs or wait for more
2240          * RPCs to complete */
2241         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2242                 cli->cl_w_in_flight--;
2243         else
2244                 cli->cl_r_in_flight--;
2245
2246         async = cfs_list_empty(&aa->aa_oaps);
2247         if (!async) { /* from osc_send_oap_rpc() */
2248                 struct osc_async_page *oap, *tmp;
2249                 /* the caller may re-use the oap after the completion call so
2250                  * we need to clean it up a little */
2251                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2252                                              oap_rpc_item) {
2253                         cfs_list_del_init(&oap->oap_rpc_item);
2254                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2255                 }
2256                 OBDO_FREE(aa->aa_oa);
2257         } else { /* from async_internal() */
2258                 obd_count i;
2259                 for (i = 0; i < aa->aa_page_count; i++)
2260                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2261         }
2262         osc_wake_cache_waiters(cli);
2263         osc_check_rpcs0(env, cli, 1);
2264         client_obd_list_unlock(&cli->cl_loi_list_lock);
2265
2266         if (!async)
2267                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2268                                   req->rq_bulk->bd_nob_transferred);
2269         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2270         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2271
2272         RETURN(rc);
2273 }
2274
2275 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2276                                             struct client_obd *cli,
2277                                             cfs_list_t *rpc_list,
2278                                             int page_count, int cmd)
2279 {
2280         struct ptlrpc_request *req;
2281         struct brw_page **pga = NULL;
2282         struct osc_brw_async_args *aa;
2283         struct obdo *oa = NULL;
2284         const struct obd_async_page_ops *ops = NULL;
2285         struct osc_async_page *oap;
2286         struct osc_async_page *tmp;
2287         struct cl_req *clerq = NULL;
2288         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2289         struct ldlm_lock *lock = NULL;
2290         struct cl_req_attr crattr;
2291         int i, rc, mpflag = 0;
2292
2293         ENTRY;
2294         LASSERT(!cfs_list_empty(rpc_list));
2295
2296         if (cmd & OBD_BRW_MEMALLOC)
2297                 mpflag = cfs_memory_pressure_get_and_set();
2298
2299         memset(&crattr, 0, sizeof crattr);
2300         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2301         if (pga == NULL)
2302                 GOTO(out, req = ERR_PTR(-ENOMEM));
2303
2304         OBDO_ALLOC(oa);
2305         if (oa == NULL)
2306                 GOTO(out, req = ERR_PTR(-ENOMEM));
2307
2308         i = 0;
2309         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2310                 struct cl_page *page = osc_oap2cl_page(oap);
2311                 if (ops == NULL) {
2312                         ops = oap->oap_caller_ops;
2313
2314                         clerq = cl_req_alloc(env, page, crt,
2315                                              1 /* only 1-object rpcs for
2316                                                 * now */);
2317                         if (IS_ERR(clerq))
2318                                 GOTO(out, req = (void *)clerq);
2319                         lock = oap->oap_ldlm_lock;
2320                 }
2321                 pga[i] = &oap->oap_brw_page;
2322                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2323                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2324                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2325                 i++;
2326                 cl_req_page_add(env, clerq, page);
2327         }
2328
2329         /* always get the data for the obdo for the rpc */
2330         LASSERT(ops != NULL);
2331         crattr.cra_oa = oa;
2332         crattr.cra_capa = NULL;
2333         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2334         if (lock) {
2335                 oa->o_handle = lock->l_remote_handle;
2336                 oa->o_valid |= OBD_MD_FLHANDLE;
2337         }
2338
2339         rc = cl_req_prep(env, clerq);
2340         if (rc != 0) {
2341                 CERROR("cl_req_prep failed: %d\n", rc);
2342                 GOTO(out, req = ERR_PTR(rc));
2343         }
2344
2345         sort_brw_pages(pga, page_count);
2346         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2347                                   pga, &req, crattr.cra_capa, 1, 0);
2348         if (rc != 0) {
2349                 CERROR("prep_req failed: %d\n", rc);
2350                 GOTO(out, req = ERR_PTR(rc));
2351         }
2352
2353         if (cmd & OBD_BRW_MEMALLOC)
2354                 req->rq_memalloc = 1;
2355
2356         /* Need to update the timestamps after the request is built in case
2357          * we race with setattr (locally or in queue at OST).  If OST gets
2358          * later setattr before earlier BRW (as determined by the request xid),
2359          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2360          * way to do this in a single call.  bug 10150 */
2361         cl_req_attr_set(env, clerq, &crattr,
2362                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2363
2364         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2365         aa = ptlrpc_req_async_args(req);
2366         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2367         cfs_list_splice(rpc_list, &aa->aa_oaps);
2368         CFS_INIT_LIST_HEAD(rpc_list);
2369         aa->aa_clerq = clerq;
2370 out:
2371         if (cmd & OBD_BRW_MEMALLOC)
2372                 cfs_memory_pressure_restore(mpflag);
2373
2374         capa_put(crattr.cra_capa);
2375         if (IS_ERR(req)) {
2376                 if (oa)
2377                         OBDO_FREE(oa);
2378                 if (pga)
2379                         OBD_FREE(pga, sizeof(*pga) * page_count);
2380                 /* this should happen rarely and is pretty bad, it makes the
2381                  * pending list not follow the dirty order */
2382                 client_obd_list_lock(&cli->cl_loi_list_lock);
2383                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2384                         cfs_list_del_init(&oap->oap_rpc_item);
2385
2386                         /* queued sync pages can be torn down while the pages
2387                          * were between the pending list and the rpc */
2388                         if (oap->oap_interrupted) {
2389                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2390                                 osc_ap_completion(env, cli, NULL, oap, 0,
2391                                                   oap->oap_count);
2392                                 continue;
2393                         }
2394                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2395                 }
2396                 if (clerq && !IS_ERR(clerq))
2397                         cl_req_completion(env, clerq, PTR_ERR(req));
2398         }
2399         RETURN(req);
2400 }
2401
2402 /**
2403  * prepare pages for ASYNC io and put pages in send queue.
2404  *
2405  * \param cmd OBD_BRW_* macroses
2406  * \param lop pending pages
2407  *
2408  * \return zero if no page added to send queue.
2409  * \return 1 if pages successfully added to send queue.
2410  * \return negative on errors.
2411  */
2412 static int
2413 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2414                  struct lov_oinfo *loi, int cmd,
2415                  struct loi_oap_pages *lop, pdl_policy_t pol)
2416 {
2417         struct ptlrpc_request *req;
2418         obd_count page_count = 0;
2419         struct osc_async_page *oap = NULL, *tmp;
2420         struct osc_brw_async_args *aa;
2421         const struct obd_async_page_ops *ops;
2422         CFS_LIST_HEAD(rpc_list);
2423         int srvlock = 0, mem_tight = 0;
2424         struct cl_object *clob = NULL;
2425         obd_off starting_offset = OBD_OBJECT_EOF;
2426         unsigned int ending_offset;
2427         int starting_page_off = 0;
2428         ENTRY;
2429
2430         /* ASYNC_HP pages first. At present, when the lock the pages is
2431          * to be canceled, the pages covered by the lock will be sent out
2432          * with ASYNC_HP. We have to send out them as soon as possible. */
2433         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2434                 if (oap->oap_async_flags & ASYNC_HP)
2435                         cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
2436                 if (++page_count >= cli->cl_max_pages_per_rpc)
2437                         break;
2438         }
2439         page_count = 0;
2440
2441         /* first we find the pages we're allowed to work with */
2442         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2443                                      oap_pending_item) {
2444                 ops = oap->oap_caller_ops;
2445
2446                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2447                          "magic 0x%x\n", oap, oap->oap_magic);
2448
2449                 if (clob == NULL) {
2450                         /* pin object in memory, so that completion call-backs
2451                          * can be safely called under client_obd_list lock. */
2452                         clob = osc_oap2cl_page(oap)->cp_obj;
2453                         cl_object_get(clob);
2454                 }
2455
2456                 if (page_count != 0 &&
2457                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2458                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2459                                " oap %p, page %p, srvlock %u\n",
2460                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2461                         break;
2462                 }
2463
2464                 /* If there is a gap at the start of this page, it can't merge
2465                  * with any previous page, so we'll hand the network a
2466                  * "fragmented" page array that it can't transfer in 1 RDMA */
2467                 if (oap->oap_obj_off < starting_offset) {
2468                         if (starting_page_off != 0)
2469                                 break;
2470
2471                         starting_page_off = oap->oap_page_off;
2472                         starting_offset = oap->oap_obj_off + starting_page_off;
2473                 } else if (oap->oap_page_off != 0)
2474                         break;
2475
2476                 /* in llite being 'ready' equates to the page being locked
2477                  * until completion unlocks it.  commit_write submits a page
2478                  * as not ready because its unlock will happen unconditionally
2479                  * as the call returns.  if we race with commit_write giving
2480                  * us that page we don't want to create a hole in the page
2481                  * stream, so we stop and leave the rpc to be fired by
2482                  * another dirtier or kupdated interval (the not ready page
2483                  * will still be on the dirty list).  we could call in
2484                  * at the end of ll_file_write to process the queue again. */
2485                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2486                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2487                                                     cmd);
2488                         if (rc < 0)
2489                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2490                                                 "instead of ready\n", oap,
2491                                                 oap->oap_page, rc);
2492                         switch (rc) {
2493                         case -EAGAIN:
2494                                 /* llite is telling us that the page is still
2495                                  * in commit_write and that we should try
2496                                  * and put it in an rpc again later.  we
2497                                  * break out of the loop so we don't create
2498                                  * a hole in the sequence of pages in the rpc
2499                                  * stream.*/
2500                                 oap = NULL;
2501                                 break;
2502                         case -EINTR:
2503                                 /* the io isn't needed.. tell the checks
2504                                  * below to complete the rpc with EINTR */
2505                                 cfs_spin_lock(&oap->oap_lock);
2506                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2507                                 cfs_spin_unlock(&oap->oap_lock);
2508                                 oap->oap_count = -EINTR;
2509                                 break;
2510                         case 0:
2511                                 cfs_spin_lock(&oap->oap_lock);
2512                                 oap->oap_async_flags |= ASYNC_READY;
2513                                 cfs_spin_unlock(&oap->oap_lock);
2514                                 break;
2515                         default:
2516                                 LASSERTF(0, "oap %p page %p returned %d "
2517                                             "from make_ready\n", oap,
2518                                             oap->oap_page, rc);
2519                                 break;
2520                         }
2521                 }
2522                 if (oap == NULL)
2523                         break;
2524
2525                 /* take the page out of our book-keeping */
2526                 cfs_list_del_init(&oap->oap_pending_item);
2527                 lop_update_pending(cli, lop, cmd, -1);
2528                 cfs_list_del_init(&oap->oap_urgent_item);
2529
2530                 /* ask the caller for the size of the io as the rpc leaves. */
2531                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2532                         oap->oap_count =
2533                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2534                                                       cmd);
2535                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2536                 }
2537                 if (oap->oap_count <= 0) {
2538                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2539                                oap->oap_count);
2540                         osc_ap_completion(env, cli, NULL,
2541                                           oap, 0, oap->oap_count);
2542                         continue;
2543                 }
2544
2545                 /* now put the page back in our accounting */
2546                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2547                 if (page_count++ == 0)
2548                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2549
2550                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2551                         mem_tight = 1;
2552
2553                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2554                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2555                  * have the same alignment as the initial writes that allocated
2556                  * extents on the server. */
2557                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2558                                 oap->oap_count;
2559                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2560                         break;
2561
2562                 if (page_count >= cli->cl_max_pages_per_rpc)
2563                         break;
2564
2565                 /* If there is a gap at the end of this page, it can't merge
2566                  * with any subsequent pages, so we'll hand the network a
2567                  * "fragmented" page array that it can't transfer in 1 RDMA */
2568                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2569                         break;
2570         }
2571
2572         osc_wake_cache_waiters(cli);
2573
2574         loi_list_maint(cli, loi);
2575
2576         client_obd_list_unlock(&cli->cl_loi_list_lock);
2577
2578         if (clob != NULL)
2579                 cl_object_put(env, clob);
2580
2581         if (page_count == 0) {
2582                 client_obd_list_lock(&cli->cl_loi_list_lock);
2583                 RETURN(0);
2584         }
2585
2586         req = osc_build_req(env, cli, &rpc_list, page_count,
2587                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2588         if (IS_ERR(req)) {
2589                 LASSERT(cfs_list_empty(&rpc_list));
2590                 loi_list_maint(cli, loi);
2591                 RETURN(PTR_ERR(req));
2592         }
2593
2594         aa = ptlrpc_req_async_args(req);
2595
2596         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2597         if (cmd == OBD_BRW_READ) {
2598                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2599                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2600                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2601                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2602         } else {
2603                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2604                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2605                                  cli->cl_w_in_flight);
2606                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2607                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2608         }
2609
2610         client_obd_list_lock(&cli->cl_loi_list_lock);
2611
2612         if (cmd == OBD_BRW_READ)
2613                 cli->cl_r_in_flight++;
2614         else
2615                 cli->cl_w_in_flight++;
2616
2617         /* queued sync pages can be torn down while the pages
2618          * were between the pending list and the rpc */
2619         tmp = NULL;
2620         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2621                 /* only one oap gets a request reference */
2622                 if (tmp == NULL)
2623                         tmp = oap;
2624                 if (oap->oap_interrupted && !req->rq_intr) {
2625                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2626                                oap, req);
2627                         ptlrpc_mark_interrupted(req);
2628                 }
2629         }
2630         if (tmp != NULL)
2631                 tmp->oap_request = ptlrpc_request_addref(req);
2632
2633         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2634                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2635
2636         req->rq_interpret_reply = brw_interpret;
2637
2638         /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2639          *      CPU/NUMA node the majority of pages were allocated on, and try
2640          *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2641          *      to reduce cross-CPU memory traffic.
2642          *
2643          *      But on the other hand, we expect that multiple ptlrpcd threads
2644          *      and the initial write sponsor can run in parallel, especially
2645          *      when data checksum is enabled, which is CPU-bound operation and
2646          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
2647          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2648          */
2649         ptlrpcd_add_req(req, pol, -1);
2650         RETURN(1);
2651 }
2652
2653 #define LOI_DEBUG(LOI, STR, args...)                                     \
2654         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2655                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2656                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2657                (LOI)->loi_write_lop.lop_num_pending,                     \
2658                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2659                (LOI)->loi_read_lop.lop_num_pending,                      \
2660                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2661                args)                                                     \
2662
2663 /* This is called by osc_check_rpcs() to find which objects have pages that
2664  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2665 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2666 {
2667         ENTRY;
2668
2669         /* First return objects that have blocked locks so that they
2670          * will be flushed quickly and other clients can get the lock,
2671          * then objects which have pages ready to be stuffed into RPCs */
2672         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2673                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2674                                       struct lov_oinfo, loi_hp_ready_item));
2675         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2676                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2677                                       struct lov_oinfo, loi_ready_item));
2678
2679         /* then if we have cache waiters, return all objects with queued
2680          * writes.  This is especially important when many small files
2681          * have filled up the cache and not been fired into rpcs because
2682          * they don't pass the nr_pending/object threshhold */
2683         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2684             !cfs_list_empty(&cli->cl_loi_write_list))
2685                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2686                                       struct lov_oinfo, loi_write_item));
2687
2688         /* then return all queued objects when we have an invalid import
2689          * so that they get flushed */
2690         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2691                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2692                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2693                                               struct lov_oinfo,
2694                                               loi_write_item));
2695                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2696                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2697                                               struct lov_oinfo, loi_read_item));
2698         }
2699         RETURN(NULL);
2700 }
2701
2702 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2703 {
2704         struct osc_async_page *oap;
2705         int hprpc = 0;
2706
2707         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2708                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2709                                      struct osc_async_page, oap_urgent_item);
2710                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2711         }
2712
2713         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2714                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2715                                      struct osc_async_page, oap_urgent_item);
2716                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2717         }
2718
2719         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2720 }
2721
2722 /* called with the loi list lock held */
2723 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2724 {
2725         struct lov_oinfo *loi;
2726         int rc = 0, race_counter = 0;
2727         pdl_policy_t pol;
2728         ENTRY;
2729
2730         pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2731
2732         while ((loi = osc_next_loi(cli)) != NULL) {
2733                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2734
2735                 if (osc_max_rpc_in_flight(cli, loi))
2736                         break;
2737
2738                 /* attempt some read/write balancing by alternating between
2739                  * reads and writes in an object.  The makes_rpc checks here
2740                  * would be redundant if we were getting read/write work items
2741                  * instead of objects.  we don't want send_oap_rpc to drain a
2742                  * partial read pending queue when we're given this object to
2743                  * do io on writes while there are cache waiters */
2744                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2745                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2746                                               &loi->loi_write_lop, pol);
2747                         if (rc < 0) {
2748                                 CERROR("Write request failed with %d\n", rc);
2749
2750                                 /* osc_send_oap_rpc failed, mostly because of
2751                                  * memory pressure.
2752                                  *
2753                                  * It can't break here, because if:
2754                                  *  - a page was submitted by osc_io_submit, so
2755                                  *    page locked;
2756                                  *  - no request in flight
2757                                  *  - no subsequent request
2758                                  * The system will be in live-lock state,
2759                                  * because there is no chance to call
2760                                  * osc_io_unplug() and osc_check_rpcs() any
2761                                  * more. pdflush can't help in this case,
2762                                  * because it might be blocked at grabbing
2763                                  * the page lock as we mentioned.
2764                                  *
2765                                  * Anyway, continue to drain pages. */
2766                                 /* break; */
2767                         }
2768
2769                         if (rc > 0)
2770                                 race_counter = 0;
2771                         else if (rc == 0)
2772                                 race_counter++;
2773                 }
2774                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2775                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2776                                               &loi->loi_read_lop, pol);
2777                         if (rc < 0)
2778                                 CERROR("Read request failed with %d\n", rc);
2779
2780                         if (rc > 0)
2781                                 race_counter = 0;
2782                         else if (rc == 0)
2783                                 race_counter++;
2784                 }
2785
2786                 /* attempt some inter-object balancing by issuing rpcs
2787                  * for each object in turn */
2788                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2789                         cfs_list_del_init(&loi->loi_hp_ready_item);
2790                 if (!cfs_list_empty(&loi->loi_ready_item))
2791                         cfs_list_del_init(&loi->loi_ready_item);
2792                 if (!cfs_list_empty(&loi->loi_write_item))
2793                         cfs_list_del_init(&loi->loi_write_item);
2794                 if (!cfs_list_empty(&loi->loi_read_item))
2795                         cfs_list_del_init(&loi->loi_read_item);
2796
2797                 loi_list_maint(cli, loi);
2798
2799                 /* send_oap_rpc fails with 0 when make_ready tells it to
2800                  * back off.  llite's make_ready does this when it tries
2801                  * to lock a page queued for write that is already locked.
2802                  * we want to try sending rpcs from many objects, but we
2803                  * don't want to spin failing with 0.  */
2804                 if (race_counter == 10)
2805                         break;
2806         }
2807 }
2808
2809 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2810 {
2811         osc_check_rpcs0(env, cli, 0);
2812 }
2813
2814 /* we're trying to queue a page in the osc so we're subject to the
2815  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2816  * If the osc's queued pages are already at that limit, then we want to sleep
2817  * until there is space in the osc's queue for us.  We also may be waiting for
2818  * write credits from the OST if there are RPCs in flight that may return some
2819  * before we fall back to sync writes.
2820  *
2821  * We need this know our allocation was granted in the presence of signals */
2822 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2823 {
2824         int rc;
2825         ENTRY;
2826         client_obd_list_lock(&cli->cl_loi_list_lock);
2827         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2828         client_obd_list_unlock(&cli->cl_loi_list_lock);
2829         RETURN(rc);
2830 };
2831
2832 /**
2833  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2834  * is available.
2835  */
2836 int osc_enter_cache_try(const struct lu_env *env,
2837                         struct client_obd *cli, struct lov_oinfo *loi,
2838                         struct osc_async_page *oap, int transient)
2839 {
2840         int has_grant;
2841
2842         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2843         if (has_grant) {
2844                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2845                 if (transient) {
2846                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2847                         cfs_atomic_inc(&obd_dirty_transit_pages);
2848                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2849                 }
2850         }
2851         return has_grant;
2852 }
2853
2854 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2855  * grant or cache space. */
2856 static int osc_enter_cache(const struct lu_env *env,
2857                            struct client_obd *cli, struct lov_oinfo *loi,
2858                            struct osc_async_page *oap)
2859 {
2860         struct osc_cache_waiter ocw;
2861         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2862
2863         ENTRY;
2864
2865         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2866                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2867                cli->cl_dirty_max, obd_max_dirty_pages,
2868                cli->cl_lost_grant, cli->cl_avail_grant);
2869
2870         /* force the caller to try sync io.  this can jump the list
2871          * of queued writes and create a discontiguous rpc stream */
2872         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2873             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2874             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2875                 RETURN(-EDQUOT);
2876
2877         /* Hopefully normal case - cache space and write credits available */
2878         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2879             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2880             osc_enter_cache_try(env, cli, loi, oap, 0))
2881                 RETURN(0);
2882
2883         /* It is safe to block as a cache waiter as long as there is grant
2884          * space available or the hope of additional grant being returned
2885          * when an in flight write completes.  Using the write back cache
2886          * if possible is preferable to sending the data synchronously
2887          * because write pages can then be merged in to large requests.
2888          * The addition of this cache waiter will causing pending write
2889          * pages to be sent immediately. */
2890         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2891                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2892                 cfs_waitq_init(&ocw.ocw_waitq);
2893                 ocw.ocw_oap = oap;
2894                 ocw.ocw_rc = 0;
2895
2896                 loi_list_maint(cli, loi);
2897                 osc_check_rpcs(env, cli);
2898                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2899
2900                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2901                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2902
2903                 client_obd_list_lock(&cli->cl_loi_list_lock);
2904                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2905                         cfs_list_del(&ocw.ocw_entry);
2906                         RETURN(-EINTR);
2907                 }
2908                 RETURN(ocw.ocw_rc);
2909         }
2910
2911         RETURN(-EDQUOT);
2912 }
2913
2914
2915 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2916                         struct lov_oinfo *loi, cfs_page_t *page,
2917                         obd_off offset, const struct obd_async_page_ops *ops,
2918                         void *data, void **res, int nocache,
2919                         struct lustre_handle *lockh)
2920 {
2921         struct osc_async_page *oap;
2922
2923         ENTRY;
2924
2925         if (!page)
2926                 return cfs_size_round(sizeof(*oap));
2927
2928         oap = *res;
2929         oap->oap_magic = OAP_MAGIC;
2930         oap->oap_cli = &exp->exp_obd->u.cli;
2931         oap->oap_loi = loi;
2932
2933         oap->oap_caller_ops = ops;
2934         oap->oap_caller_data = data;
2935
2936         oap->oap_page = page;
2937         oap->oap_obj_off = offset;
2938         if (!client_is_remote(exp) &&
2939             cfs_capable(CFS_CAP_SYS_RESOURCE))
2940                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2941
2942         LASSERT(!(offset & ~CFS_PAGE_MASK));
2943
2944         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2945         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2946         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2947         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2948
2949         cfs_spin_lock_init(&oap->oap_lock);
2950         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2951         RETURN(0);
2952 }
2953
2954 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2955                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2956                        struct osc_async_page *oap, int cmd, int off,
2957                        int count, obd_flag brw_flags, enum async_flags async_flags)
2958 {
2959         struct client_obd *cli = &exp->exp_obd->u.cli;
2960         int rc = 0;
2961         ENTRY;
2962
2963         if (oap->oap_magic != OAP_MAGIC)
2964                 RETURN(-EINVAL);
2965
2966         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2967                 RETURN(-EIO);
2968
2969         if (!cfs_list_empty(&oap->oap_pending_item) ||
2970             !cfs_list_empty(&oap->oap_urgent_item) ||
2971             !cfs_list_empty(&oap->oap_rpc_item))
2972                 RETURN(-EBUSY);
2973
2974         /* check if the file's owner/group is over quota */
2975         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2976                 struct cl_object *obj;
2977                 struct cl_attr    attr; /* XXX put attr into thread info */
2978                 unsigned int qid[MAXQUOTAS];
2979
2980                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2981
2982                 cl_object_attr_lock(obj);
2983                 rc = cl_object_attr_get(env, obj, &attr);
2984                 cl_object_attr_unlock(obj);
2985
2986                 qid[USRQUOTA] = attr.cat_uid;
2987                 qid[GRPQUOTA] = attr.cat_gid;
2988                 if (rc == 0 &&
2989                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
2990                         rc = -EDQUOT;
2991                 if (rc)
2992                         RETURN(rc);
2993         }
2994
2995         if (loi == NULL)
2996                 loi = lsm->lsm_oinfo[0];
2997
2998         client_obd_list_lock(&cli->cl_loi_list_lock);
2999
3000         LASSERT(off + count <= CFS_PAGE_SIZE);
3001         oap->oap_cmd = cmd;
3002         oap->oap_page_off = off;
3003         oap->oap_count = count;
3004         oap->oap_brw_flags = brw_flags;
3005         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3006         if (cfs_memory_pressure_get())
3007                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3008         cfs_spin_lock(&oap->oap_lock);
3009         oap->oap_async_flags = async_flags;
3010         cfs_spin_unlock(&oap->oap_lock);
3011
3012         if (cmd & OBD_BRW_WRITE) {
3013                 rc = osc_enter_cache(env, cli, loi, oap);
3014                 if (rc) {
3015                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3016                         RETURN(rc);
3017                 }
3018         }
3019
3020         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3021                   cmd);
3022
3023         osc_oap_to_pending(oap);
3024         loi_list_maint(cli, loi);
3025         if (!osc_max_rpc_in_flight(cli, loi) &&
3026             lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3027                 LASSERT(cli->cl_writeback_work != NULL);
3028                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3029
3030                 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3031                        cli, rc);
3032         }
3033         client_obd_list_unlock(&cli->cl_loi_list_lock);
3034
3035         RETURN(0);
3036 }
3037
3038 /* aka (~was & now & flag), but this is more clear :) */
3039 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3040
3041 int osc_set_async_flags_base(struct client_obd *cli,
3042                              struct lov_oinfo *loi, struct osc_async_page *oap,
3043                              obd_flag async_flags)
3044 {
3045         struct loi_oap_pages *lop;
3046         int flags = 0;
3047         ENTRY;
3048
3049         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3050
3051         if (oap->oap_cmd & OBD_BRW_WRITE) {
3052                 lop = &loi->loi_write_lop;
3053         } else {
3054                 lop = &loi->loi_read_lop;
3055         }
3056
3057         if ((oap->oap_async_flags & async_flags) == async_flags)
3058                 RETURN(0);
3059
3060         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3061                 flags |= ASYNC_READY;
3062
3063         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3064             cfs_list_empty(&oap->oap_rpc_item)) {
3065                 if (oap->oap_async_flags & ASYNC_HP)
3066                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3067                 else
3068                         cfs_list_add_tail(&oap->oap_urgent_item,
3069                                           &lop->lop_urgent);
3070                 flags |= ASYNC_URGENT;
3071                 loi_list_maint(cli, loi);
3072         }
3073         cfs_spin_lock(&oap->oap_lock);
3074         oap->oap_async_flags |= flags;
3075         cfs_spin_unlock(&oap->oap_lock);
3076
3077         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3078                         oap->oap_async_flags);
3079         RETURN(0);
3080 }
3081
3082 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3083                             struct lov_oinfo *loi, struct osc_async_page *oap)
3084 {
3085         struct client_obd *cli = &exp->exp_obd->u.cli;
3086         struct loi_oap_pages *lop;
3087         int rc = 0;
3088         ENTRY;
3089
3090         if (oap->oap_magic != OAP_MAGIC)
3091                 RETURN(-EINVAL);
3092
3093         if (loi == NULL)
3094                 loi = lsm->lsm_oinfo[0];
3095
3096         if (oap->oap_cmd & OBD_BRW_WRITE) {
3097                 lop = &loi->loi_write_lop;
3098         } else {
3099                 lop = &loi->loi_read_lop;
3100         }
3101
3102         client_obd_list_lock(&cli->cl_loi_list_lock);
3103
3104         if (!cfs_list_empty(&oap->oap_rpc_item))
3105                 GOTO(out, rc = -EBUSY);
3106
3107         osc_exit_cache(cli, oap, 0);
3108         osc_wake_cache_waiters(cli);
3109
3110         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3111                 cfs_list_del_init(&oap->oap_urgent_item);
3112                 cfs_spin_lock(&oap->oap_lock);
3113                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3114                 cfs_spin_unlock(&oap->oap_lock);
3115         }
3116         if (!cfs_list_empty(&oap->oap_pending_item)) {
3117                 cfs_list_del_init(&oap->oap_pending_item);
3118                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3119         }
3120         loi_list_maint(cli, loi);
3121         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3122 out:
3123         client_obd_list_unlock(&cli->cl_loi_list_lock);
3124         RETURN(rc);
3125 }
3126
3127 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3128                                         struct ldlm_enqueue_info *einfo)
3129 {
3130         void *data = einfo->ei_cbdata;
3131         int set = 0;
3132
3133         LASSERT(lock != NULL);
3134         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3135         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3136         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3137         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3138
3139         lock_res_and_lock(lock);
3140         cfs_spin_lock(&osc_ast_guard);
3141
3142         if (lock->l_ast_data == NULL)
3143                 lock->l_ast_data = data;
3144         if (lock->l_ast_data == data)
3145                 set = 1;
3146
3147         cfs_spin_unlock(&osc_ast_guard);
3148         unlock_res_and_lock(lock);
3149
3150         return set;
3151 }
3152
3153 static int osc_set_data_with_check(struct lustre_handle *lockh,
3154                                    struct ldlm_enqueue_info *einfo)
3155 {
3156         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3157         int set = 0;
3158
3159         if (lock != NULL) {
3160                 set = osc_set_lock_data_with_check(lock, einfo);
3161                 LDLM_LOCK_PUT(lock);
3162         } else
3163                 CERROR("lockh %p, data %p - client evicted?\n",
3164                        lockh, einfo->ei_cbdata);
3165         return set;
3166 }
3167
3168 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3169                              ldlm_iterator_t replace, void *data)
3170 {
3171         struct ldlm_res_id res_id;
3172         struct obd_device *obd = class_exp2obd(exp);
3173
3174         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3175         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3176         return 0;
3177 }
3178
3179 /* find any ldlm lock of the inode in osc
3180  * return 0    not find
3181  *        1    find one
3182  *      < 0    error */
3183 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3184                            ldlm_iterator_t replace, void *data)
3185 {
3186         struct ldlm_res_id res_id;
3187         struct obd_device *obd = class_exp2obd(exp);
3188         int rc = 0;
3189
3190         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3191         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3192         if (rc == LDLM_ITER_STOP)
3193                 return(1);
3194         if (rc == LDLM_ITER_CONTINUE)
3195                 return(0);
3196         return(rc);
3197 }
3198
3199 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3200                             obd_enqueue_update_f upcall, void *cookie,
3201                             int *flags, int agl, int rc)
3202 {
3203         int intent = *flags & LDLM_FL_HAS_INTENT;
3204         ENTRY;
3205
3206         if (intent) {
3207                 /* The request was created before ldlm_cli_enqueue call. */
3208                 if (rc == ELDLM_LOCK_ABORTED) {
3209                         struct ldlm_reply *rep;
3210                         rep = req_capsule_server_get(&req->rq_pill,
3211                                                      &RMF_DLM_REP);
3212
3213                         LASSERT(rep != NULL);
3214                         if (rep->lock_policy_res1)
3215                                 rc = rep->lock_policy_res1;
3216                 }
3217         }
3218
3219         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3220             (rc == 0)) {
3221                 *flags |= LDLM_FL_LVB_READY;
3222                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3223                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3224         }
3225
3226         /* Call the update callback. */
3227         rc = (*upcall)(cookie, rc);
3228         RETURN(rc);
3229 }
3230
3231 static int osc_enqueue_interpret(const struct lu_env *env,
3232                                  struct ptlrpc_request *req,
3233                                  struct osc_enqueue_args *aa, int rc)
3234 {
3235         struct ldlm_lock *lock;
3236         struct lustre_handle handle;
3237         __u32 mode;
3238         struct ost_lvb *lvb;
3239         __u32 lvb_len;
3240         int *flags = aa->oa_flags;
3241
3242         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3243          * might be freed anytime after lock upcall has been called. */
3244         lustre_handle_copy(&handle, aa->oa_lockh);
3245         mode = aa->oa_ei->ei_mode;
3246
3247         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3248          * be valid. */
3249         lock = ldlm_handle2lock(&handle);
3250
3251         /* Take an additional reference so that a blocking AST that
3252          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3253          * to arrive after an upcall has been executed by
3254          * osc_enqueue_fini(). */
3255         ldlm_lock_addref(&handle, mode);
3256
3257         /* Let CP AST to grant the lock first. */
3258         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3259
3260         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3261                 lvb = NULL;
3262                 lvb_len = 0;
3263         } else {
3264                 lvb = aa->oa_lvb;
3265                 lvb_len = sizeof(*aa->oa_lvb);
3266         }
3267
3268         /* Complete obtaining the lock procedure. */
3269         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3270                                    mode, flags, lvb, lvb_len, &handle, rc);
3271         /* Complete osc stuff. */
3272         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3273                               flags, aa->oa_agl, rc);
3274
3275         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3276
3277         /* Release the lock for async request. */
3278         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3279                 /*
3280                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3281                  * not already released by
3282                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3283                  */
3284                 ldlm_lock_decref(&handle, mode);
3285
3286         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3287                  aa->oa_lockh, req, aa);
3288         ldlm_lock_decref(&handle, mode);
3289         LDLM_LOCK_PUT(lock);
3290         return rc;
3291 }
3292
3293 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3294                         struct lov_oinfo *loi, int flags,
3295                         struct ost_lvb *lvb, __u32 mode, int rc)
3296 {
3297         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3298
3299         if (rc == ELDLM_OK) {
3300                 __u64 tmp;
3301
3302                 LASSERT(lock != NULL);
3303                 loi->loi_lvb = *lvb;
3304                 tmp = loi->loi_lvb.lvb_size;
3305                 /* Extend KMS up to the end of this lock and no further
3306                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3307                 if (tmp > lock->l_policy_data.l_extent.end)
3308                         tmp = lock->l_policy_data.l_extent.end + 1;
3309                 if (tmp >= loi->loi_kms) {
3310                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3311                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3312                         loi_kms_set(loi, tmp);
3313                 } else {
3314                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3315                                    LPU64"; leaving kms="LPU64", end="LPU64,
3316                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3317                                    lock->l_policy_data.l_extent.end);
3318                 }
3319                 ldlm_lock_allow_match(lock);
3320         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3321                 LASSERT(lock != NULL);
3322                 loi->loi_lvb = *lvb;
3323                 ldlm_lock_allow_match(lock);
3324                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3325                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3326                 rc = ELDLM_OK;
3327         }
3328
3329         if (lock != NULL) {
3330                 if (rc != ELDLM_OK)
3331                         ldlm_lock_fail_match(lock, rc);
3332
3333                 LDLM_LOCK_PUT(lock);
3334         }
3335 }
3336 EXPORT_SYMBOL(osc_update_enqueue);
3337
3338 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3339
3340 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3341  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3342  * other synchronous requests, however keeping some locks and trying to obtain
3343  * others may take a considerable amount of time in a case of ost failure; and
3344  * when other sync requests do not get released lock from a client, the client
3345  * is excluded from the cluster -- such scenarious make the life difficult, so
3346  * release locks just after they are obtained. */
3347 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3348                      int *flags, ldlm_policy_data_t *policy,
3349                      struct ost_lvb *lvb, int kms_valid,
3350                      obd_enqueue_update_f upcall, void *cookie,
3351                      struct ldlm_enqueue_info *einfo,
3352                      struct lustre_handle *lockh,
3353                      struct ptlrpc_request_set *rqset, int async, int agl)
3354 {
3355         struct obd_device *obd = exp->exp_obd;
3356         struct ptlrpc_request *req = NULL;
3357         int intent = *flags & LDLM_FL_HAS_INTENT;
3358         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3359         ldlm_mode_t mode;
3360         int rc;
3361         ENTRY;
3362
3363         /* Filesystem lock extents are extended to page boundaries so that
3364          * dealing with the page cache is a little smoother.  */
3365         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3366         policy->l_extent.end |= ~CFS_PAGE_MASK;
3367
3368         /*
3369          * kms is not valid when either object is completely fresh (so that no
3370          * locks are cached), or object was evicted. In the latter case cached
3371          * lock cannot be used, because it would prime inode state with
3372          * potentially stale LVB.
3373          */
3374         if (!kms_valid)
3375                 goto no_match;
3376
3377         /* Next, search for already existing extent locks that will cover us */
3378         /* If we're trying to read, we also search for an existing PW lock.  The
3379          * VFS and page cache already protect us locally, so lots of readers/
3380          * writers can share a single PW lock.
3381          *
3382          * There are problems with conversion deadlocks, so instead of
3383          * converting a read lock to a write lock, we'll just enqueue a new
3384          * one.
3385          *
3386          * At some point we should cancel the read lock instead of making them
3387          * send us a blocking callback, but there are problems with canceling
3388          * locks out from other users right now, too. */
3389         mode = einfo->ei_mode;
3390         if (einfo->ei_mode == LCK_PR)
3391                 mode |= LCK_PW;
3392         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3393                                einfo->ei_type, policy, mode, lockh, 0);
3394         if (mode) {
3395                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3396
3397                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3398                         /* For AGL, if enqueue RPC is sent but the lock is not
3399                          * granted, then skip to process this strpe.
3400                          * Return -ECANCELED to tell the caller. */
3401                         ldlm_lock_decref(lockh, mode);
3402                         LDLM_LOCK_PUT(matched);
3403                         RETURN(-ECANCELED);
3404                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3405                         *flags |= LDLM_FL_LVB_READY;
3406                         /* addref the lock only if not async requests and PW
3407                          * lock is matched whereas we asked for PR. */
3408                         if (!rqset && einfo->ei_mode != mode)
3409                                 ldlm_lock_addref(lockh, LCK_PR);
3410                         if (intent) {
3411                                 /* I would like to be able to ASSERT here that
3412                                  * rss <= kms, but I can't, for reasons which
3413                                  * are explained in lov_enqueue() */
3414                         }
3415
3416                         /* We already have a lock, and it's referenced */
3417                         (*upcall)(cookie, ELDLM_OK);
3418
3419                         if (einfo->ei_mode != mode)
3420                                 ldlm_lock_decref(lockh, LCK_PW);
3421                         else if (rqset)
3422                                 /* For async requests, decref the lock. */
3423                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3424                         LDLM_LOCK_PUT(matched);
3425                         RETURN(ELDLM_OK);
3426                 } else {
3427                         ldlm_lock_decref(lockh, mode);
3428                         LDLM_LOCK_PUT(matched);
3429                 }
3430         }
3431
3432  no_match:
3433         if (intent) {
3434                 CFS_LIST_HEAD(cancels);
3435                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3436                                            &RQF_LDLM_ENQUEUE_LVB);
3437                 if (req == NULL)
3438                         RETURN(-ENOMEM);
3439
3440                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3441                 if (rc) {
3442                         ptlrpc_request_free(req);
3443                         RETURN(rc);
3444                 }
3445
3446                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3447                                      sizeof *lvb);
3448                 ptlrpc_request_set_replen(req);
3449         }
3450
3451         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3452         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3453
3454         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3455                               sizeof(*lvb), lockh, async);
3456         if (rqset) {
3457                 if (!rc) {
3458                         struct osc_enqueue_args *aa;
3459                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3460                         aa = ptlrpc_req_async_args(req);
3461                         aa->oa_ei = einfo;
3462                         aa->oa_exp = exp;
3463                         aa->oa_flags  = flags;
3464                         aa->oa_upcall = upcall;
3465                         aa->oa_cookie = cookie;
3466                         aa->oa_lvb    = lvb;
3467                         aa->oa_lockh  = lockh;
3468                         aa->oa_agl    = !!agl;
3469
3470                         req->rq_interpret_reply =
3471                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3472                         if (rqset == PTLRPCD_SET)
3473                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3474                         else
3475                                 ptlrpc_set_add_req(rqset, req);
3476                 } else if (intent) {
3477                         ptlrpc_req_finished(req);
3478                 }
3479                 RETURN(rc);
3480         }
3481
3482         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3483         if (intent)
3484                 ptlrpc_req_finished(req);
3485
3486         RETURN(rc);
3487 }
3488
3489 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3490                        struct ldlm_enqueue_info *einfo,
3491                        struct ptlrpc_request_set *rqset)
3492 {
3493         struct ldlm_res_id res_id;
3494         int rc;
3495         ENTRY;
3496
3497         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3498                            oinfo->oi_md->lsm_object_seq, &res_id);
3499
3500         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3501                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3502                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3503                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3504                               rqset, rqset != NULL, 0);
3505         RETURN(rc);
3506 }
3507
3508 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3509                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3510                    int *flags, void *data, struct lustre_handle *lockh,
3511                    int unref)
3512 {
3513         struct obd_device *obd = exp->exp_obd;
3514         int lflags = *flags;
3515         ldlm_mode_t rc;
3516         ENTRY;
3517
3518         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3519                 RETURN(-EIO);
3520
3521         /* Filesystem lock extents are extended to page boundaries so that
3522          * dealing with the page cache is a little smoother */
3523         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3524         policy->l_extent.end |= ~CFS_PAGE_MASK;
3525
3526         /* Next, search for already existing extent locks that will cover us */
3527         /* If we're trying to read, we also search for an existing PW lock.  The
3528          * VFS and page cache already protect us locally, so lots of readers/
3529          * writers can share a single PW lock. */
3530         rc = mode;
3531         if (mode == LCK_PR)
3532                 rc |= LCK_PW;
3533         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3534                              res_id, type, policy, rc, lockh, unref);
3535         if (rc) {
3536                 if (data != NULL) {
3537                         if (!osc_set_data_with_check(lockh, data)) {
3538                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3539                                         ldlm_lock_decref(lockh, rc);
3540                                 RETURN(0);
3541                         }
3542                 }
3543                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3544                         ldlm_lock_addref(lockh, LCK_PR);
3545                         ldlm_lock_decref(lockh, LCK_PW);
3546                 }
3547                 RETURN(rc);
3548         }
3549         RETURN(rc);
3550 }
3551
3552 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3553 {
3554         ENTRY;
3555
3556         if (unlikely(mode == LCK_GROUP))
3557                 ldlm_lock_decref_and_cancel(lockh, mode);
3558         else
3559                 ldlm_lock_decref(lockh, mode);
3560
3561         RETURN(0);
3562 }
3563
3564 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3565                       __u32 mode, struct lustre_handle *lockh)
3566 {
3567         ENTRY;
3568         RETURN(osc_cancel_base(lockh, mode));
3569 }
3570
3571 static int osc_cancel_unused(struct obd_export *exp,
3572                              struct lov_stripe_md *lsm,
3573                              ldlm_cancel_flags_t flags,
3574                              void *opaque)
3575 {
3576         struct obd_device *obd = class_exp2obd(exp);
3577         struct ldlm_res_id res_id, *resp = NULL;
3578
3579         if (lsm != NULL) {
3580                 resp = osc_build_res_name(lsm->lsm_object_id,
3581                                           lsm->lsm_object_seq, &res_id);
3582         }
3583
3584         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3585 }
3586
3587 static int osc_statfs_interpret(const struct lu_env *env,
3588                                 struct ptlrpc_request *req,
3589                                 struct osc_async_args *aa, int rc)
3590 {
3591         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3592         struct obd_statfs *msfs;
3593         __u64 used;
3594         ENTRY;
3595
3596         if (rc == -EBADR)
3597                 /* The request has in fact never been sent
3598                  * due to issues at a higher level (LOV).
3599                  * Exit immediately since the caller is
3600                  * aware of the problem and takes care
3601                  * of the clean up */
3602                  RETURN(rc);
3603
3604         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3605             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3606                 GOTO(out, rc = 0);
3607
3608         if (rc != 0)
3609                 GOTO(out, rc);
3610
3611         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3612         if (msfs == NULL) {
3613                 GOTO(out, rc = -EPROTO);
3614         }
3615
3616         /* Reinitialize the RDONLY and DEGRADED flags at the client
3617          * on each statfs, so they don't stay set permanently. */
3618         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3619
3620         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3621                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3622         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3623                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3624
3625         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3626                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3627         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3628                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3629
3630         /* Add a bit of hysteresis so this flag isn't continually flapping,
3631          * and ensure that new files don't get extremely fragmented due to
3632          * only a small amount of available space in the filesystem.
3633          * We want to set the NOSPC flag when there is less than ~0.1% free
3634          * and clear it when there is at least ~0.2% free space, so:
3635          *                   avail < ~0.1% max          max = avail + used
3636          *            1025 * avail < avail + used       used = blocks - free
3637          *            1024 * avail < used
3638          *            1024 * avail < blocks - free
3639          *                   avail < ((blocks - free) >> 10)
3640          *
3641          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3642          * lose that amount of space so in those cases we report no space left
3643          * if their is less than 1 GB left.                             */
3644         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3645         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3646                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3647                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3648         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3649                           (msfs->os_ffree > 64) &&
3650                           (msfs->os_bavail > (used << 1)))) {
3651                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3652                                              OSCC_FLAG_NOSPC_BLK);
3653         }
3654
3655         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3656                      (msfs->os_bavail < used)))
3657                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3658
3659         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3660
3661         *aa->aa_oi->oi_osfs = *msfs;
3662 out:
3663         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3664         RETURN(rc);
3665 }
3666
3667 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3668                             __u64 max_age, struct ptlrpc_request_set *rqset)
3669 {
3670         struct ptlrpc_request *req;
3671         struct osc_async_args *aa;
3672         int                    rc;
3673         ENTRY;
3674
3675         /* We could possibly pass max_age in the request (as an absolute
3676          * timestamp or a "seconds.usec ago") so the target can avoid doing
3677          * extra calls into the filesystem if that isn't necessary (e.g.
3678          * during mount that would help a bit).  Having relative timestamps
3679          * is not so great if request processing is slow, while absolute
3680          * timestamps are not ideal because they need time synchronization. */
3681         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3682         if (req == NULL)
3683                 RETURN(-ENOMEM);
3684
3685         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3686         if (rc) {
3687                 ptlrpc_request_free(req);
3688                 RETURN(rc);
3689         }
3690         ptlrpc_request_set_replen(req);
3691         req->rq_request_portal = OST_CREATE_PORTAL;
3692         ptlrpc_at_set_req_timeout(req);
3693
3694         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3695                 /* procfs requests not want stat in wait for avoid deadlock */
3696                 req->rq_no_resend = 1;
3697                 req->rq_no_delay = 1;
3698         }
3699
3700         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3701         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3702         aa = ptlrpc_req_async_args(req);
3703         aa->aa_oi = oinfo;
3704
3705         ptlrpc_set_add_req(rqset, req);
3706         RETURN(0);
3707 }
3708
3709 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3710                       __u64 max_age, __u32 flags)
3711 {
3712         struct obd_statfs     *msfs;
3713         struct ptlrpc_request *req;
3714         struct obd_import     *imp = NULL;
3715         int rc;
3716         ENTRY;
3717
3718         /*Since the request might also come from lprocfs, so we need
3719          *sync this with client_disconnect_export Bug15684*/
3720         cfs_down_read(&obd->u.cli.cl_sem);
3721         if (obd->u.cli.cl_import)
3722                 imp = class_import_get(obd->u.cli.cl_import);
3723         cfs_up_read(&obd->u.cli.cl_sem);
3724         if (!imp)
3725                 RETURN(-ENODEV);
3726
3727         /* We could possibly pass max_age in the request (as an absolute
3728          * timestamp or a "seconds.usec ago") so the target can avoid doing
3729          * extra calls into the filesystem if that isn't necessary (e.g.
3730          * during mount that would help a bit).  Having relative timestamps
3731          * is not so great if request processing is slow, while absolute
3732          * timestamps are not ideal because they need time synchronization. */
3733         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3734
3735         class_import_put(imp);
3736
3737         if (req == NULL)
3738                 RETURN(-ENOMEM);
3739
3740         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3741         if (rc) {
3742                 ptlrpc_request_free(req);
3743                 RETURN(rc);
3744         }
3745         ptlrpc_request_set_replen(req);
3746         req->rq_request_portal = OST_CREATE_PORTAL;
3747         ptlrpc_at_set_req_timeout(req);
3748
3749         if (flags & OBD_STATFS_NODELAY) {
3750                 /* procfs requests not want stat in wait for avoid deadlock */
3751                 req->rq_no_resend = 1;
3752                 req->rq_no_delay = 1;
3753         }
3754
3755         rc = ptlrpc_queue_wait(req);
3756         if (rc)
3757                 GOTO(out, rc);
3758
3759         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3760         if (msfs == NULL) {
3761                 GOTO(out, rc = -EPROTO);
3762         }
3763
3764         *osfs = *msfs;
3765
3766         EXIT;
3767  out:
3768         ptlrpc_req_finished(req);
3769         return rc;
3770 }
3771
3772 /* Retrieve object striping information.
3773  *
3774  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3775  * the maximum number of OST indices which will fit in the user buffer.
3776  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3777  */
3778 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3779 {
3780         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3781         struct lov_user_md_v3 lum, *lumk;
3782         struct lov_user_ost_data_v1 *lmm_objects;
3783         int rc = 0, lum_size;
3784         ENTRY;
3785
3786         if (!lsm)
3787                 RETURN(-ENODATA);
3788
3789         /* we only need the header part from user space to get lmm_magic and
3790          * lmm_stripe_count, (the header part is common to v1 and v3) */
3791         lum_size = sizeof(struct lov_user_md_v1);
3792         if (cfs_copy_from_user(&lum, lump, lum_size))
3793                 RETURN(-EFAULT);
3794
3795         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3796             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3797                 RETURN(-EINVAL);
3798
3799         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3800         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3801         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3802         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3803
3804         /* we can use lov_mds_md_size() to compute lum_size
3805          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3806         if (lum.lmm_stripe_count > 0) {
3807                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3808                 OBD_ALLOC(lumk, lum_size);
3809                 if (!lumk)
3810                         RETURN(-ENOMEM);
3811
3812                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3813                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3814                 else
3815                         lmm_objects = &(lumk->lmm_objects[0]);
3816                 lmm_objects->l_object_id = lsm->lsm_object_id;
3817         } else {
3818                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3819                 lumk = &lum;
3820         }
3821
3822         lumk->lmm_object_id = lsm->lsm_object_id;
3823         lumk->lmm_object_seq = lsm->lsm_object_seq;
3824         lumk->lmm_stripe_count = 1;
3825
3826         if (cfs_copy_to_user(lump, lumk, lum_size))
3827                 rc = -EFAULT;
3828
3829         if (lumk != &lum)
3830                 OBD_FREE(lumk, lum_size);
3831
3832         RETURN(rc);
3833 }
3834
3835
3836 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3837                          void *karg, void *uarg)
3838 {
3839         struct obd_device *obd = exp->exp_obd;
3840         struct obd_ioctl_data *data = karg;
3841         int err = 0;
3842         ENTRY;
3843
3844         if (!cfs_try_module_get(THIS_MODULE)) {
3845                 CERROR("Can't get module. Is it alive?");
3846                 return -EINVAL;
3847         }
3848         switch (cmd) {
3849         case OBD_IOC_LOV_GET_CONFIG: {
3850                 char *buf;
3851                 struct lov_desc *desc;
3852                 struct obd_uuid uuid;
3853
3854                 buf = NULL;
3855                 len = 0;
3856                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3857                         GOTO(out, err = -EINVAL);
3858
3859                 data = (struct obd_ioctl_data *)buf;
3860
3861                 if (sizeof(*desc) > data->ioc_inllen1) {
3862                         obd_ioctl_freedata(buf, len);
3863                         GOTO(out, err = -EINVAL);
3864                 }
3865
3866                 if (data->ioc_inllen2 < sizeof(uuid)) {
3867                         obd_ioctl_freedata(buf, len);
3868                         GOTO(out, err = -EINVAL);
3869                 }
3870
3871                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3872                 desc->ld_tgt_count = 1;
3873                 desc->ld_active_tgt_count = 1;
3874                 desc->ld_default_stripe_count = 1;
3875                 desc->ld_default_stripe_size = 0;
3876                 desc->ld_default_stripe_offset = 0;
3877                 desc->ld_pattern = 0;
3878                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3879
3880                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3881
3882                 err = cfs_copy_to_user((void *)uarg, buf, len);
3883                 if (err)
3884                         err = -EFAULT;
3885                 obd_ioctl_freedata(buf, len);
3886                 GOTO(out, err);
3887         }
3888         case LL_IOC_LOV_SETSTRIPE:
3889                 err = obd_alloc_memmd(exp, karg);
3890                 if (err > 0)
3891                         err = 0;
3892                 GOTO(out, err);
3893         case LL_IOC_LOV_GETSTRIPE:
3894                 err = osc_getstripe(karg, uarg);
3895                 GOTO(out, err);
3896         case OBD_IOC_CLIENT_RECOVER:
3897                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3898                                             data->ioc_inlbuf1, 0);
3899                 if (err > 0)
3900                         err = 0;
3901                 GOTO(out, err);
3902         case IOC_OSC_SET_ACTIVE:
3903                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3904                                                data->ioc_offset);
3905                 GOTO(out, err);
3906         case OBD_IOC_POLL_QUOTACHECK:
3907                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3908                 GOTO(out, err);
3909         case OBD_IOC_PING_TARGET:
3910                 err = ptlrpc_obd_ping(obd);
3911                 GOTO(out, err);
3912         default:
3913                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3914                        cmd, cfs_curproc_comm());
3915                 GOTO(out, err = -ENOTTY);
3916         }
3917 out:
3918         cfs_module_put(THIS_MODULE);
3919         return err;
3920 }
3921
3922 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3923                         void *key, __u32 *vallen, void *val,
3924                         struct lov_stripe_md *lsm)
3925 {
3926         ENTRY;
3927         if (!vallen || !val)
3928                 RETURN(-EFAULT);
3929
3930         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3931                 __u32 *stripe = val;
3932                 *vallen = sizeof(*stripe);
3933                 *stripe = 0;
3934                 RETURN(0);
3935         } else if (KEY_IS(KEY_LAST_ID)) {
3936                 struct ptlrpc_request *req;
3937                 obd_id                *reply;
3938                 char                  *tmp;
3939                 int                    rc;
3940
3941                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3942                                            &RQF_OST_GET_INFO_LAST_ID);
3943                 if (req == NULL)
3944                         RETURN(-ENOMEM);
3945
3946                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3947                                      RCL_CLIENT, keylen);
3948                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3949                 if (rc) {
3950                         ptlrpc_request_free(req);
3951                         RETURN(rc);
3952                 }
3953
3954                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3955                 memcpy(tmp, key, keylen);
3956
3957                 req->rq_no_delay = req->rq_no_resend = 1;
3958                 ptlrpc_request_set_replen(req);
3959                 rc = ptlrpc_queue_wait(req);
3960                 if (rc)
3961                         GOTO(out, rc);
3962
3963                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3964                 if (reply == NULL)
3965                         GOTO(out, rc = -EPROTO);
3966
3967                 *((obd_id *)val) = *reply;
3968         out:
3969                 ptlrpc_req_finished(req);
3970                 RETURN(rc);
3971         } else if (KEY_IS(KEY_FIEMAP)) {
3972                 struct ptlrpc_request *req;
3973                 struct ll_user_fiemap *reply;
3974                 char *tmp;
3975                 int rc;
3976
3977                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3978                                            &RQF_OST_GET_INFO_FIEMAP);
3979                 if (req == NULL)
3980                         RETURN(-ENOMEM);
3981
3982                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3983                                      RCL_CLIENT, keylen);
3984                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3985                                      RCL_CLIENT, *vallen);
3986                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3987                                      RCL_SERVER, *vallen);
3988
3989                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3990                 if (rc) {
3991                         ptlrpc_request_free(req);
3992                         RETURN(rc);
3993                 }
3994
3995                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3996                 memcpy(tmp, key, keylen);
3997                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3998                 memcpy(tmp, val, *vallen);
3999
4000                 ptlrpc_request_set_replen(req);
4001                 rc = ptlrpc_queue_wait(req);
4002                 if (rc)
4003                         GOTO(out1, rc);
4004
4005                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4006                 if (reply == NULL)
4007                         GOTO(out1, rc = -EPROTO);
4008
4009                 memcpy(val, reply, *vallen);
4010         out1:
4011                 ptlrpc_req_finished(req);
4012
4013                 RETURN(rc);
4014         }
4015
4016         RETURN(-EINVAL);
4017 }
4018
4019 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4020 {
4021         struct llog_ctxt *ctxt;
4022         int rc = 0;
4023         ENTRY;
4024
4025         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4026         if (ctxt) {
4027                 rc = llog_initiator_connect(ctxt);
4028                 llog_ctxt_put(ctxt);
4029         } else {
4030                 /* XXX return an error? skip setting below flags? */
4031         }
4032
4033         cfs_spin_lock(&imp->imp_lock);
4034         imp->imp_server_timeout = 1;
4035         imp->imp_pingable = 1;
4036         cfs_spin_unlock(&imp->imp_lock);
4037         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4038
4039         RETURN(rc);
4040 }
4041
4042 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4043                                           struct ptlrpc_request *req,
4044                                           void *aa, int rc)
4045 {
4046         ENTRY;
4047         if (rc != 0)
4048                 RETURN(rc);
4049
4050         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4051 }
4052
4053 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4054                               void *key, obd_count vallen, void *val,
4055                               struct ptlrpc_request_set *set)
4056 {
4057         struct ptlrpc_request *req;
4058         struct obd_device     *obd = exp->exp_obd;
4059         struct obd_import     *imp = class_exp2cliimp(exp);
4060         char                  *tmp;
4061         int                    rc;
4062         ENTRY;
4063
4064         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4065
4066         if (KEY_IS(KEY_NEXT_ID)) {
4067                 obd_id new_val;
4068                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4069
4070                 if (vallen != sizeof(obd_id))
4071                         RETURN(-ERANGE);
4072                 if (val == NULL)
4073                         RETURN(-EINVAL);
4074
4075                 if (vallen != sizeof(obd_id))
4076                         RETURN(-EINVAL);
4077
4078                 /* avoid race between allocate new object and set next id
4079                  * from ll_sync thread */
4080                 cfs_spin_lock(&oscc->oscc_lock);
4081                 new_val = *((obd_id*)val) + 1;
4082                 if (new_val > oscc->oscc_next_id)
4083                         oscc->oscc_next_id = new_val;
4084                 cfs_spin_unlock(&oscc->oscc_lock);
4085                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4086                        exp->exp_obd->obd_name,
4087                        obd->u.cli.cl_oscc.oscc_next_id);
4088
4089                 RETURN(0);
4090         }
4091
4092         if (KEY_IS(KEY_CHECKSUM)) {
4093                 if (vallen != sizeof(int))
4094                         RETURN(-EINVAL);
4095                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4096                 RETURN(0);
4097         }
4098
4099         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4100                 sptlrpc_conf_client_adapt(obd);
4101                 RETURN(0);
4102         }
4103
4104         if (KEY_IS(KEY_FLUSH_CTX)) {
4105                 sptlrpc_import_flush_my_ctx(imp);
4106                 RETURN(0);
4107         }
4108
4109         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4110                 RETURN(-EINVAL);
4111
4112         /* We pass all other commands directly to OST. Since nobody calls osc
4113            methods directly and everybody is supposed to go through LOV, we
4114            assume lov checked invalid values for us.
4115            The only recognised values so far are evict_by_nid and mds_conn.
4116            Even if something bad goes through, we'd get a -EINVAL from OST
4117            anyway. */
4118
4119         if (KEY_IS(KEY_GRANT_SHRINK))
4120                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4121         else
4122                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4123
4124         if (req == NULL)
4125                 RETURN(-ENOMEM);
4126
4127         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4128                              RCL_CLIENT, keylen);
4129         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4130                              RCL_CLIENT, vallen);
4131         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4132         if (rc) {
4133                 ptlrpc_request_free(req);
4134                 RETURN(rc);
4135         }
4136
4137         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4138         memcpy(tmp, key, keylen);
4139         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4140         memcpy(tmp, val, vallen);
4141
4142         if (KEY_IS(KEY_MDS_CONN)) {
4143                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4144
4145                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4146                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4147                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4148                 req->rq_no_delay = req->rq_no_resend = 1;
4149                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4150         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4151                 struct osc_grant_args *aa;
4152                 struct obdo *oa;
4153
4154                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4155                 aa = ptlrpc_req_async_args(req);
4156                 OBDO_ALLOC(oa);
4157                 if (!oa) {
4158                         ptlrpc_req_finished(req);
4159                         RETURN(-ENOMEM);
4160                 }
4161                 *oa = ((struct ost_body *)val)->oa;
4162                 aa->aa_oa = oa;
4163                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4164         }
4165
4166         ptlrpc_request_set_replen(req);
4167         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4168                 LASSERT(set != NULL);
4169                 ptlrpc_set_add_req(set, req);
4170                 ptlrpc_check_set(NULL, set);
4171         } else
4172                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4173
4174         RETURN(0);
4175 }
4176
4177
4178 static struct llog_operations osc_size_repl_logops = {
4179         lop_cancel: llog_obd_repl_cancel
4180 };
4181
4182 static struct llog_operations osc_mds_ost_orig_logops;
4183
4184 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4185                            struct obd_device *tgt, struct llog_catid *catid)
4186 {
4187         int rc;
4188         ENTRY;
4189
4190         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4191                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4192         if (rc) {
4193                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4194                 GOTO(out, rc);
4195         }
4196
4197         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4198                         NULL, &osc_size_repl_logops);
4199         if (rc) {
4200                 struct llog_ctxt *ctxt =
4201                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4202                 if (ctxt)
4203                         llog_cleanup(ctxt);
4204                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4205         }
4206         GOTO(out, rc);
4207 out:
4208         if (rc) {
4209                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4210                        obd->obd_name, tgt->obd_name, catid, rc);
4211                 CERROR("logid "LPX64":0x%x\n",
4212                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4213         }
4214         return rc;
4215 }
4216
4217 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4218                          struct obd_device *disk_obd, int *index)
4219 {
4220         struct llog_catid catid;
4221         static char name[32] = CATLIST;
4222         int rc;
4223         ENTRY;
4224
4225         LASSERT(olg == &obd->obd_olg);
4226
4227         cfs_mutex_down(&olg->olg_cat_processing);
4228         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4229         if (rc) {
4230                 CERROR("rc: %d\n", rc);
4231                 GOTO(out, rc);
4232         }
4233
4234         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4235                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4236                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4237
4238         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4239         if (rc) {
4240                 CERROR("rc: %d\n", rc);
4241                 GOTO(out, rc);
4242         }
4243
4244         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4245         if (rc) {
4246                 CERROR("rc: %d\n", rc);
4247                 GOTO(out, rc);
4248         }
4249
4250  out:
4251         cfs_mutex_up(&olg->olg_cat_processing);
4252
4253         return rc;
4254 }
4255
4256 static int osc_llog_finish(struct obd_device *obd, int count)
4257 {
4258         struct llog_ctxt *ctxt;
4259         int rc = 0, rc2 = 0;
4260         ENTRY;
4261
4262         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4263         if (ctxt)
4264                 rc = llog_cleanup(ctxt);
4265
4266         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4267         if (ctxt)
4268                 rc2 = llog_cleanup(ctxt);
4269         if (!rc)
4270                 rc = rc2;
4271
4272         RETURN(rc);
4273 }
4274
4275 static int osc_reconnect(const struct lu_env *env,
4276                          struct obd_export *exp, struct obd_device *obd,
4277                          struct obd_uuid *cluuid,
4278                          struct obd_connect_data *data,
4279                          void *localdata)
4280 {
4281         struct client_obd *cli = &obd->u.cli;
4282
4283         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4284                 long lost_grant;
4285
4286                 client_obd_list_lock(&cli->cl_loi_list_lock);
4287                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4288                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4289                 lost_grant = cli->cl_lost_grant;
4290                 cli->cl_lost_grant = 0;
4291                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4292
4293                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4294                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4295                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4296                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4297                        " ocd_grant: %d\n", data->ocd_connect_flags,
4298                        data->ocd_version, data->ocd_grant);
4299         }
4300
4301         RETURN(0);
4302 }
4303
4304 static int osc_disconnect(struct obd_export *exp)
4305 {
4306         struct obd_device *obd = class_exp2obd(exp);
4307         struct llog_ctxt  *ctxt;
4308         int rc;
4309
4310         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4311         if (ctxt) {
4312                 if (obd->u.cli.cl_conn_count == 1) {
4313                         /* Flush any remaining cancel messages out to the
4314                          * target */
4315                         llog_sync(ctxt, exp);
4316                 }
4317                 llog_ctxt_put(ctxt);
4318         } else {
4319                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4320                        obd);
4321         }
4322
4323         rc = client_disconnect_export(exp);
4324         /**
4325          * Initially we put del_shrink_grant before disconnect_export, but it
4326          * causes the following problem if setup (connect) and cleanup
4327          * (disconnect) are tangled together.
4328          *      connect p1                     disconnect p2
4329          *   ptlrpc_connect_import
4330          *     ...............               class_manual_cleanup
4331          *                                     osc_disconnect
4332          *                                     del_shrink_grant
4333          *   ptlrpc_connect_interrupt
4334          *     init_grant_shrink
4335          *   add this client to shrink list
4336          *                                      cleanup_osc
4337          * Bang! pinger trigger the shrink.
4338          * So the osc should be disconnected from the shrink list, after we
4339          * are sure the import has been destroyed. BUG18662
4340          */
4341         if (obd->u.cli.cl_import == NULL)
4342                 osc_del_shrink_grant(&obd->u.cli);
4343         return rc;
4344 }
4345
4346 static int osc_import_event(struct obd_device *obd,
4347                             struct obd_import *imp,
4348                             enum obd_import_event event)
4349 {
4350         struct client_obd *cli;
4351         int rc = 0;
4352
4353         ENTRY;
4354         LASSERT(imp->imp_obd == obd);
4355
4356         switch (event) {
4357         case IMP_EVENT_DISCON: {
4358                 /* Only do this on the MDS OSC's */
4359                 if (imp->imp_server_timeout) {
4360                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4361
4362                         cfs_spin_lock(&oscc->oscc_lock);
4363                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4364                         cfs_spin_unlock(&oscc->oscc_lock);
4365                 }
4366                 cli = &obd->u.cli;
4367                 client_obd_list_lock(&cli->cl_loi_list_lock);
4368                 cli->cl_avail_grant = 0;
4369                 cli->cl_lost_grant = 0;
4370                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4371                 break;
4372         }
4373         case IMP_EVENT_INACTIVE: {
4374                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4375                 break;
4376         }
4377         case IMP_EVENT_INVALIDATE: {
4378                 struct ldlm_namespace *ns = obd->obd_namespace;
4379                 struct lu_env         *env;
4380                 int                    refcheck;
4381
4382                 env = cl_env_get(&refcheck);
4383                 if (!IS_ERR(env)) {
4384                         /* Reset grants */
4385                         cli = &obd->u.cli;
4386                         client_obd_list_lock(&cli->cl_loi_list_lock);
4387                         /* all pages go to failing rpcs due to the invalid
4388                          * import */
4389                         osc_check_rpcs(env, cli);
4390                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4391
4392                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4393                         cl_env_put(env, &refcheck);
4394                 } else
4395                         rc = PTR_ERR(env);
4396                 break;
4397         }
4398         case IMP_EVENT_ACTIVE: {
4399                 /* Only do this on the MDS OSC's */
4400                 if (imp->imp_server_timeout) {
4401                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4402
4403                         cfs_spin_lock(&oscc->oscc_lock);
4404                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4405                                               OSCC_FLAG_NOSPC_BLK);
4406                         cfs_spin_unlock(&oscc->oscc_lock);
4407                 }
4408                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4409                 break;
4410         }
4411         case IMP_EVENT_OCD: {
4412                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4413
4414                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4415                         osc_init_grant(&obd->u.cli, ocd);
4416
4417                 /* See bug 7198 */
4418                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4419                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4420
4421                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4422                 break;
4423         }
4424         case IMP_EVENT_DEACTIVATE: {
4425                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4426                 break;
4427         }
4428         case IMP_EVENT_ACTIVATE: {
4429                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4430                 break;
4431         }
4432         default:
4433                 CERROR("Unknown import event %d\n", event);
4434                 LBUG();
4435         }
4436         RETURN(rc);
4437 }
4438
4439 /**
4440  * Determine whether the lock can be canceled before replaying the lock
4441  * during recovery, see bug16774 for detailed information.
4442  *
4443  * \retval zero the lock can't be canceled
4444  * \retval other ok to cancel
4445  */
4446 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4447 {
4448         check_res_locked(lock->l_resource);
4449
4450         /*
4451          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4452          *
4453          * XXX as a future improvement, we can also cancel unused write lock
4454          * if it doesn't have dirty data and active mmaps.
4455          */
4456         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4457             (lock->l_granted_mode == LCK_PR ||
4458              lock->l_granted_mode == LCK_CR) &&
4459             (osc_dlm_lock_pageref(lock) == 0))
4460                 RETURN(1);
4461
4462         RETURN(0);
4463 }
4464
4465 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4466 {
4467         struct client_obd *cli = &obd->u.cli;
4468         int rc;
4469         ENTRY;
4470
4471         ENTRY;
4472         rc = ptlrpcd_addref();
4473         if (rc)
4474                 RETURN(rc);
4475
4476         rc = client_obd_setup(obd, lcfg);
4477         if (rc == 0) {
4478                 void *handler;
4479                 handler = ptlrpcd_alloc_work(cli->cl_import,
4480                                              brw_queue_work, cli);
4481                 if (!IS_ERR(handler))
4482                         cli->cl_writeback_work = handler;
4483                 else
4484                         rc = PTR_ERR(handler);
4485         }
4486
4487         if (rc == 0) {
4488                 struct lprocfs_static_vars lvars = { 0 };
4489
4490                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4491                 lprocfs_osc_init_vars(&lvars);
4492                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4493                         lproc_osc_attach_seqstat(obd);
4494                         sptlrpc_lprocfs_cliobd_attach(obd);
4495                         ptlrpc_lprocfs_register_obd(obd);
4496                 }
4497
4498                 oscc_init(obd);
4499                 /* We need to allocate a few requests more, because
4500                    brw_interpret tries to create new requests before freeing
4501                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4502                    reserved, but I afraid that might be too much wasted RAM
4503                    in fact, so 2 is just my guess and still should work. */
4504                 cli->cl_import->imp_rq_pool =
4505                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4506                                             OST_MAXREQSIZE,
4507                                             ptlrpc_add_rqs_to_pool);
4508
4509                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4510                 cfs_sema_init(&cli->cl_grant_sem, 1);
4511
4512                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4513         }
4514
4515         if (rc)
4516                 ptlrpcd_decref();
4517         RETURN(rc);
4518 }
4519
4520 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4521 {
4522         int rc = 0;
4523         ENTRY;
4524
4525         switch (stage) {
4526         case OBD_CLEANUP_EARLY: {
4527                 struct obd_import *imp;
4528                 imp = obd->u.cli.cl_import;
4529                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4530                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4531                 ptlrpc_deactivate_import(imp);
4532                 cfs_spin_lock(&imp->imp_lock);
4533                 imp->imp_pingable = 0;
4534                 cfs_spin_unlock(&imp->imp_lock);
4535                 break;
4536         }
4537         case OBD_CLEANUP_EXPORTS: {
4538                 struct client_obd *cli = &obd->u.cli;
4539                 /* LU-464
4540                  * for echo client, export may be on zombie list, wait for
4541                  * zombie thread to cull it, because cli.cl_import will be
4542                  * cleared in client_disconnect_export():
4543                  *   class_export_destroy() -> obd_cleanup() ->
4544                  *   echo_device_free() -> echo_client_cleanup() ->
4545                  *   obd_disconnect() -> osc_disconnect() ->
4546                  *   client_disconnect_export()
4547                  */
4548                 obd_zombie_barrier();
4549                 if (cli->cl_writeback_work) {
4550                         ptlrpcd_destroy_work(cli->cl_writeback_work);
4551                         cli->cl_writeback_work = NULL;
4552                 }
4553                 obd_cleanup_client_import(obd);
4554                 ptlrpc_lprocfs_unregister_obd(obd);
4555                 lprocfs_obd_cleanup(obd);
4556                 rc = obd_llog_finish(obd, 0);
4557                 if (rc != 0)
4558                         CERROR("failed to cleanup llogging subsystems\n");
4559                 break;
4560                 }
4561         }
4562         RETURN(rc);
4563 }
4564
4565 int osc_cleanup(struct obd_device *obd)
4566 {
4567         int rc;
4568
4569         ENTRY;
4570
4571         /* free memory of osc quota cache */
4572         osc_quota_cleanup(obd);
4573
4574         rc = client_obd_cleanup(obd);
4575
4576         ptlrpcd_decref();
4577         RETURN(rc);
4578 }
4579
4580 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4581 {
4582         struct lprocfs_static_vars lvars = { 0 };
4583         int rc = 0;
4584
4585         lprocfs_osc_init_vars(&lvars);
4586
4587         switch (lcfg->lcfg_command) {
4588         default:
4589                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4590                                               lcfg, obd);
4591                 if (rc > 0)
4592                         rc = 0;
4593                 break;
4594         }
4595
4596         return(rc);
4597 }
4598
4599 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4600 {
4601         return osc_process_config_base(obd, buf);
4602 }
4603
4604 struct obd_ops osc_obd_ops = {
4605         .o_owner                = THIS_MODULE,
4606         .o_setup                = osc_setup,
4607         .o_precleanup           = osc_precleanup,
4608         .o_cleanup              = osc_cleanup,
4609         .o_add_conn             = client_import_add_conn,
4610         .o_del_conn             = client_import_del_conn,
4611         .o_connect              = client_connect_import,
4612         .o_reconnect            = osc_reconnect,
4613         .o_disconnect           = osc_disconnect,
4614         .o_statfs               = osc_statfs,
4615         .o_statfs_async         = osc_statfs_async,
4616         .o_packmd               = osc_packmd,
4617         .o_unpackmd             = osc_unpackmd,
4618         .o_precreate            = osc_precreate,
4619         .o_create               = osc_create,
4620         .o_create_async         = osc_create_async,
4621         .o_destroy              = osc_destroy,
4622         .o_getattr              = osc_getattr,
4623         .o_getattr_async        = osc_getattr_async,
4624         .o_setattr              = osc_setattr,
4625         .o_setattr_async        = osc_setattr_async,
4626         .o_brw                  = osc_brw,
4627         .o_punch                = osc_punch,
4628         .o_sync                 = osc_sync,
4629         .o_enqueue              = osc_enqueue,
4630         .o_change_cbdata        = osc_change_cbdata,
4631         .o_find_cbdata          = osc_find_cbdata,
4632         .o_cancel               = osc_cancel,
4633         .o_cancel_unused        = osc_cancel_unused,
4634         .o_iocontrol            = osc_iocontrol,
4635         .o_get_info             = osc_get_info,
4636         .o_set_info_async       = osc_set_info_async,
4637         .o_import_event         = osc_import_event,
4638         .o_llog_init            = osc_llog_init,
4639         .o_llog_finish          = osc_llog_finish,
4640         .o_process_config       = osc_process_config,
4641         .o_quotactl             = osc_quotactl,
4642         .o_quotacheck           = osc_quotacheck,
4643         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
4644 };
4645
4646 extern struct lu_kmem_descr osc_caches[];
4647 extern cfs_spinlock_t       osc_ast_guard;
4648 extern cfs_lock_class_key_t osc_ast_guard_class;
4649
4650 int __init osc_init(void)
4651 {
4652         struct lprocfs_static_vars lvars = { 0 };
4653         int rc;
4654         ENTRY;
4655
4656         /* print an address of _any_ initialized kernel symbol from this
4657          * module, to allow debugging with gdb that doesn't support data
4658          * symbols from modules.*/
4659         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4660
4661         rc = lu_kmem_init(osc_caches);
4662
4663         lprocfs_osc_init_vars(&lvars);
4664
4665         osc_quota_init();
4666         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4667                                  LUSTRE_OSC_NAME, &osc_device_type);
4668         if (rc) {
4669                 lu_kmem_fini(osc_caches);
4670                 RETURN(rc);
4671         }
4672
4673         cfs_spin_lock_init(&osc_ast_guard);
4674         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4675
4676         osc_mds_ost_orig_logops = llog_lvfs_ops;
4677         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4678         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4679         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4680         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4681
4682         RETURN(rc);
4683 }
4684
4685 #ifdef __KERNEL__
4686 static void /*__exit*/ osc_exit(void)
4687 {
4688         lu_device_type_fini(&osc_device_type);
4689
4690         osc_quota_exit();
4691         class_unregister_type(LUSTRE_OSC_NAME);
4692         lu_kmem_fini(osc_caches);
4693 }
4694
4695 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4696 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4697 MODULE_LICENSE("GPL");
4698
4699 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4700 #endif