Whamcloud - gitweb
f1bb130e218fe530197d2d8909b56d84a47544c3
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 static int brw_interpret(const struct lu_env *env,
68                          struct ptlrpc_request *req, void *data, int rc);
69 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
70                             int ptlrpc);
71 int osc_cleanup(struct obd_device *obd);
72
73 /* Pack OSC object metadata for disk storage (LE byte order). */
74 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
75                       struct lov_stripe_md *lsm)
76 {
77         int lmm_size;
78         ENTRY;
79
80         lmm_size = sizeof(**lmmp);
81         if (!lmmp)
82                 RETURN(lmm_size);
83
84         if (*lmmp && !lsm) {
85                 OBD_FREE(*lmmp, lmm_size);
86                 *lmmp = NULL;
87                 RETURN(0);
88         }
89
90         if (!*lmmp) {
91                 OBD_ALLOC(*lmmp, lmm_size);
92                 if (!*lmmp)
93                         RETURN(-ENOMEM);
94         }
95
96         if (lsm) {
97                 LASSERT(lsm->lsm_object_id);
98                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
99                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
101         }
102
103         RETURN(lmm_size);
104 }
105
106 /* Unpack OSC object metadata from disk storage (LE byte order). */
107 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
108                         struct lov_mds_md *lmm, int lmm_bytes)
109 {
110         int lsm_size;
111         struct obd_import *imp = class_exp2cliimp(exp);
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         if (imp != NULL &&
160             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
161                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
162         else
163                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164
165         RETURN(lsm_size);
166 }
167
168 static inline void osc_pack_capa(struct ptlrpc_request *req,
169                                  struct ost_body *body, void *capa)
170 {
171         struct obd_capa *oc = (struct obd_capa *)capa;
172         struct lustre_capa *c;
173
174         if (!capa)
175                 return;
176
177         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
178         LASSERT(c);
179         capa_cpy(c, oc);
180         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
181         DEBUG_CAPA(D_SEC, c, "pack");
182 }
183
184 static inline void osc_pack_req_body(struct ptlrpc_request *req,
185                                      struct obd_info *oinfo)
186 {
187         struct ost_body *body;
188
189         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
190         LASSERT(body);
191
192         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
193         osc_pack_capa(req, body, oinfo->oi_capa);
194 }
195
196 static inline void osc_set_capa_size(struct ptlrpc_request *req,
197                                      const struct req_msg_field *field,
198                                      struct obd_capa *oc)
199 {
200         if (oc == NULL)
201                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
202         else
203                 /* it is already calculated as sizeof struct obd_capa */
204                 ;
205 }
206
207 static int osc_getattr_interpret(const struct lu_env *env,
208                                  struct ptlrpc_request *req,
209                                  struct osc_async_args *aa, int rc)
210 {
211         struct ost_body *body;
212         ENTRY;
213
214         if (rc != 0)
215                 GOTO(out, rc);
216
217         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
218         if (body) {
219                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
221
222                 /* This should really be sent by the OST */
223                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
224                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
225         } else {
226                 CDEBUG(D_INFO, "can't unpack ost_body\n");
227                 rc = -EPROTO;
228                 aa->aa_oi->oi_oa->o_valid = 0;
229         }
230 out:
231         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232         RETURN(rc);
233 }
234
235 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
236                              struct ptlrpc_request_set *set)
237 {
238         struct ptlrpc_request *req;
239         struct osc_async_args *aa;
240         int                    rc;
241         ENTRY;
242
243         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244         if (req == NULL)
245                 RETURN(-ENOMEM);
246
247         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
248         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
249         if (rc) {
250                 ptlrpc_request_free(req);
251                 RETURN(rc);
252         }
253
254         osc_pack_req_body(req, oinfo);
255
256         ptlrpc_request_set_replen(req);
257         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
258
259         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
260         aa = ptlrpc_req_async_args(req);
261         aa->aa_oi = oinfo;
262
263         ptlrpc_set_add_req(set, req);
264         RETURN(0);
265 }
266
267 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
268                        struct obd_info *oinfo)
269 {
270         struct ptlrpc_request *req;
271         struct ost_body       *body;
272         int                    rc;
273         ENTRY;
274
275         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
276         if (req == NULL)
277                 RETURN(-ENOMEM);
278
279         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
280         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
281         if (rc) {
282                 ptlrpc_request_free(req);
283                 RETURN(rc);
284         }
285
286         osc_pack_req_body(req, oinfo);
287
288         ptlrpc_request_set_replen(req);
289
290         rc = ptlrpc_queue_wait(req);
291         if (rc)
292                 GOTO(out, rc);
293
294         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
295         if (body == NULL)
296                 GOTO(out, rc = -EPROTO);
297
298         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
299         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
300
301         /* This should really be sent by the OST */
302         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
303         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
304
305         EXIT;
306  out:
307         ptlrpc_req_finished(req);
308         return rc;
309 }
310
311 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
312                        struct obd_info *oinfo, struct obd_trans_info *oti)
313 {
314         struct ptlrpc_request *req;
315         struct ost_body       *body;
316         int                    rc;
317         ENTRY;
318
319         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
320
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
322         if (req == NULL)
323                 RETURN(-ENOMEM);
324
325         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
326         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 RETURN(rc);
330         }
331
332         osc_pack_req_body(req, oinfo);
333
334         ptlrpc_request_set_replen(req);
335
336         rc = ptlrpc_queue_wait(req);
337         if (rc)
338                 GOTO(out, rc);
339
340         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
341         if (body == NULL)
342                 GOTO(out, rc = -EPROTO);
343
344         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
345
346         EXIT;
347 out:
348         ptlrpc_req_finished(req);
349         RETURN(rc);
350 }
351
352 static int osc_setattr_interpret(const struct lu_env *env,
353                                  struct ptlrpc_request *req,
354                                  struct osc_setattr_args *sa, int rc)
355 {
356         struct ost_body *body;
357         ENTRY;
358
359         if (rc != 0)
360                 GOTO(out, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out, rc = -EPROTO);
365
366         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
367 out:
368         rc = sa->sa_upcall(sa->sa_cookie, rc);
369         RETURN(rc);
370 }
371
372 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
373                            struct obd_trans_info *oti,
374                            obd_enqueue_update_f upcall, void *cookie,
375                            struct ptlrpc_request_set *rqset)
376 {
377         struct ptlrpc_request   *req;
378         struct osc_setattr_args *sa;
379         int                      rc;
380         ENTRY;
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
383         if (req == NULL)
384                 RETURN(-ENOMEM);
385
386         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
387         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
388         if (rc) {
389                 ptlrpc_request_free(req);
390                 RETURN(rc);
391         }
392
393         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
394                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
395
396         osc_pack_req_body(req, oinfo);
397
398         ptlrpc_request_set_replen(req);
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
409                 sa = ptlrpc_req_async_args(req);
410                 sa->sa_oa = oinfo->oi_oa;
411                 sa->sa_upcall = upcall;
412                 sa->sa_cookie = cookie;
413
414                 if (rqset == PTLRPCD_SET)
415                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
416                 else
417                         ptlrpc_set_add_req(rqset, req);
418         }
419
420         RETURN(0);
421 }
422
423 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
424                              struct obd_trans_info *oti,
425                              struct ptlrpc_request_set *rqset)
426 {
427         return osc_setattr_async_base(exp, oinfo, oti,
428                                       oinfo->oi_cb_up, oinfo, rqset);
429 }
430
431 int osc_real_create(struct obd_export *exp, struct obdo *oa,
432                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
433 {
434         struct ptlrpc_request *req;
435         struct ost_body       *body;
436         struct lov_stripe_md  *lsm;
437         int                    rc;
438         ENTRY;
439
440         LASSERT(oa);
441         LASSERT(ea);
442
443         lsm = *ea;
444         if (!lsm) {
445                 rc = obd_alloc_memmd(exp, &lsm);
446                 if (rc < 0)
447                         RETURN(rc);
448         }
449
450         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
451         if (req == NULL)
452                 GOTO(out, rc = -ENOMEM);
453
454         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
455         if (rc) {
456                 ptlrpc_request_free(req);
457                 GOTO(out, rc);
458         }
459
460         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
461         LASSERT(body);
462         lustre_set_wire_obdo(&body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         lustre_get_wire_obdo(oa, &body->oa);
483
484         /* This should really be sent by the OST */
485         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_object_id = oa->o_id;
493         lsm->lsm_object_seq = oa->o_seq;
494         *ea = lsm;
495
496         if (oti != NULL) {
497                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
498
499                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
500                         if (!oti->oti_logcookies)
501                                 oti_alloc_cookies(oti, 1);
502                         *oti->oti_logcookies = oa->o_lcookie;
503                 }
504         }
505
506         CDEBUG(D_HA, "transno: "LPD64"\n",
507                lustre_msg_get_transno(req->rq_repmsg));
508 out_req:
509         ptlrpc_req_finished(req);
510 out:
511         if (rc && !*ea)
512                 obd_free_memmd(exp, &lsm);
513         RETURN(rc);
514 }
515
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517                    obd_enqueue_update_f upcall, void *cookie,
518                    struct ptlrpc_request_set *rqset)
519 {
520         struct ptlrpc_request   *req;
521         struct osc_setattr_args *sa;
522         struct ost_body         *body;
523         int                      rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         ptlrpc_at_set_req_timeout(req);
538
539         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540         LASSERT(body);
541         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561                      struct obd_info *oinfo, struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_async_args *aa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *aa->aa_oi->oi_oa = body->oa;
589 out:
590         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
591         RETURN(rc);
592 }
593
594 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
595                     struct obd_info *oinfo, obd_size start, obd_size end,
596                     struct ptlrpc_request_set *set)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_async_args *aa;
601         int                    rc;
602         ENTRY;
603
604         if (!oinfo->oi_oa) {
605                 CDEBUG(D_INFO, "oa NULL\n");
606                 RETURN(-EINVAL);
607         }
608
609         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
610         if (req == NULL)
611                 RETURN(-ENOMEM);
612
613         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
614         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
615         if (rc) {
616                 ptlrpc_request_free(req);
617                 RETURN(rc);
618         }
619
620         /* overload the size and blocks fields in the oa with start/end */
621         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
622         LASSERT(body);
623         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
624         body->oa.o_size = start;
625         body->oa.o_blocks = end;
626         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
627         osc_pack_capa(req, body, oinfo->oi_capa);
628
629         ptlrpc_request_set_replen(req);
630         req->rq_interpret_reply = osc_sync_interpret;
631
632         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
633         aa = ptlrpc_req_async_args(req);
634         aa->aa_oi = oinfo;
635
636         ptlrpc_set_add_req(set, req);
637         RETURN (0);
638 }
639
640 /* Find and cancel locally locks matched by @mode in the resource found by
641  * @objid. Found locks are added into @cancel list. Returns the amount of
642  * locks added to @cancels list. */
643 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
644                                    cfs_list_t *cancels,
645                                    ldlm_mode_t mode, int lock_flags)
646 {
647         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
648         struct ldlm_res_id res_id;
649         struct ldlm_resource *res;
650         int count;
651         ENTRY;
652
653         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
654         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
655         if (res == NULL)
656                 RETURN(0);
657
658         LDLM_RESOURCE_ADDREF(res);
659         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
660                                            lock_flags, 0, NULL);
661         LDLM_RESOURCE_DELREF(res);
662         ldlm_resource_putref(res);
663         RETURN(count);
664 }
665
666 static int osc_destroy_interpret(const struct lu_env *env,
667                                  struct ptlrpc_request *req, void *data,
668                                  int rc)
669 {
670         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
671
672         cfs_atomic_dec(&cli->cl_destroy_in_flight);
673         cfs_waitq_signal(&cli->cl_destroy_waitq);
674         return 0;
675 }
676
677 static int osc_can_send_destroy(struct client_obd *cli)
678 {
679         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
680             cli->cl_max_rpcs_in_flight) {
681                 /* The destroy request can be sent */
682                 return 1;
683         }
684         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
685             cli->cl_max_rpcs_in_flight) {
686                 /*
687                  * The counter has been modified between the two atomic
688                  * operations.
689                  */
690                 cfs_waitq_signal(&cli->cl_destroy_waitq);
691         }
692         return 0;
693 }
694
695 /* Destroy requests can be async always on the client, and we don't even really
696  * care about the return code since the client cannot do anything at all about
697  * a destroy failure.
698  * When the MDS is unlinking a filename, it saves the file objects into a
699  * recovery llog, and these object records are cancelled when the OST reports
700  * they were destroyed and sync'd to disk (i.e. transaction committed).
701  * If the client dies, or the OST is down when the object should be destroyed,
702  * the records are not cancelled, and when the OST reconnects to the MDS next,
703  * it will retrieve the llog unlink logs and then sends the log cancellation
704  * cookies to the MDS after committing destroy transactions. */
705 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
706                        struct obdo *oa, struct lov_stripe_md *ea,
707                        struct obd_trans_info *oti, struct obd_export *md_export,
708                        void *capa)
709 {
710         struct client_obd     *cli = &exp->exp_obd->u.cli;
711         struct ptlrpc_request *req;
712         struct ost_body       *body;
713         CFS_LIST_HEAD(cancels);
714         int rc, count;
715         ENTRY;
716
717         if (!oa) {
718                 CDEBUG(D_INFO, "oa NULL\n");
719                 RETURN(-EINVAL);
720         }
721
722         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
723                                         LDLM_FL_DISCARD_DATA);
724
725         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
726         if (req == NULL) {
727                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
728                 RETURN(-ENOMEM);
729         }
730
731         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
732         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
733                                0, &cancels, count);
734         if (rc) {
735                 ptlrpc_request_free(req);
736                 RETURN(rc);
737         }
738
739         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
740         ptlrpc_at_set_req_timeout(req);
741
742         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
743                 oa->o_lcookie = *oti->oti_logcookies;
744         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
745         LASSERT(body);
746         lustre_set_wire_obdo(&body->oa, oa);
747
748         osc_pack_capa(req, body, (struct obd_capa *)capa);
749         ptlrpc_request_set_replen(req);
750
751         /* don't throttle destroy RPCs for the MDT */
752         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
753                 req->rq_interpret_reply = osc_destroy_interpret;
754                 if (!osc_can_send_destroy(cli)) {
755                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
756                                                           NULL);
757
758                         /*
759                          * Wait until the number of on-going destroy RPCs drops
760                          * under max_rpc_in_flight
761                          */
762                         l_wait_event_exclusive(cli->cl_destroy_waitq,
763                                                osc_can_send_destroy(cli), &lwi);
764                 }
765         }
766
767         /* Do not wait for response */
768         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
769         RETURN(0);
770 }
771
772 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
773                                 long writing_bytes)
774 {
775         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
776
777         LASSERT(!(oa->o_valid & bits));
778
779         oa->o_valid |= bits;
780         client_obd_list_lock(&cli->cl_loi_list_lock);
781         oa->o_dirty = cli->cl_dirty;
782         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
783                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
784                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
785                 oa->o_undirty = 0;
786         } else if (cfs_atomic_read(&obd_dirty_pages) -
787                    cfs_atomic_read(&obd_dirty_transit_pages) >
788                    obd_max_dirty_pages + 1){
789                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
790                  * not covered by a lock thus they may safely race and trip
791                  * this CERROR() unless we add in a small fudge factor (+1). */
792                 CERROR("dirty %d - %d > system dirty_max %d\n",
793                        cfs_atomic_read(&obd_dirty_pages),
794                        cfs_atomic_read(&obd_dirty_transit_pages),
795                        obd_max_dirty_pages);
796                 oa->o_undirty = 0;
797         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
798                 CERROR("dirty %lu - dirty_max %lu too big???\n",
799                        cli->cl_dirty, cli->cl_dirty_max);
800                 oa->o_undirty = 0;
801         } else {
802                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
803                                 (cli->cl_max_rpcs_in_flight + 1);
804                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
805         }
806         oa->o_grant = cli->cl_avail_grant;
807         oa->o_dropped = cli->cl_lost_grant;
808         cli->cl_lost_grant = 0;
809         client_obd_list_unlock(&cli->cl_loi_list_lock);
810         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
811                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
812
813 }
814
815 static void osc_update_next_shrink(struct client_obd *cli)
816 {
817         cli->cl_next_shrink_grant =
818                 cfs_time_shift(cli->cl_grant_shrink_interval);
819         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
820                cli->cl_next_shrink_grant);
821 }
822
823 /* caller must hold loi_list_lock */
824 static void osc_consume_write_grant(struct client_obd *cli,
825                                     struct brw_page *pga)
826 {
827         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
828         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
829         cfs_atomic_inc(&obd_dirty_pages);
830         cli->cl_dirty += CFS_PAGE_SIZE;
831         cli->cl_avail_grant -= CFS_PAGE_SIZE;
832         pga->flag |= OBD_BRW_FROM_GRANT;
833         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
834                CFS_PAGE_SIZE, pga, pga->pg);
835         LASSERT(cli->cl_avail_grant >= 0);
836         osc_update_next_shrink(cli);
837 }
838
839 /* the companion to osc_consume_write_grant, called when a brw has completed.
840  * must be called with the loi lock held. */
841 static void osc_release_write_grant(struct client_obd *cli,
842                                     struct brw_page *pga, int sent)
843 {
844         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
845         ENTRY;
846
847         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
848         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
849                 EXIT;
850                 return;
851         }
852
853         pga->flag &= ~OBD_BRW_FROM_GRANT;
854         cfs_atomic_dec(&obd_dirty_pages);
855         cli->cl_dirty -= CFS_PAGE_SIZE;
856         if (pga->flag & OBD_BRW_NOCACHE) {
857                 pga->flag &= ~OBD_BRW_NOCACHE;
858                 cfs_atomic_dec(&obd_dirty_transit_pages);
859                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
860         }
861         if (!sent) {
862                 /* Reclaim grant from truncated pages. This is used to solve
863                  * write-truncate and grant all gone(to lost_grant) problem.
864                  * For a vfs write this problem can be easily solved by a sync
865                  * write, however, this is not an option for page_mkwrite()
866                  * because grant has to be allocated before a page becomes
867                  * dirty. */
868                 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
869                         cli->cl_avail_grant += CFS_PAGE_SIZE;
870                 else
871                         cli->cl_lost_grant += CFS_PAGE_SIZE;
872                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
873                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
874         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
875                 /* For short writes we shouldn't count parts of pages that
876                  * span a whole block on the OST side, or our accounting goes
877                  * wrong.  Should match the code in filter_grant_check. */
878                 int offset = pga->off & ~CFS_PAGE_MASK;
879                 int count = pga->count + (offset & (blocksize - 1));
880                 int end = (offset + pga->count) & (blocksize - 1);
881                 if (end)
882                         count += blocksize - end;
883
884                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
885                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
886                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
887                        cli->cl_avail_grant, cli->cl_dirty);
888         }
889
890         EXIT;
891 }
892
893 static unsigned long rpcs_in_flight(struct client_obd *cli)
894 {
895         return cli->cl_r_in_flight + cli->cl_w_in_flight;
896 }
897
898 /* caller must hold loi_list_lock */
899 void osc_wake_cache_waiters(struct client_obd *cli)
900 {
901         cfs_list_t *l, *tmp;
902         struct osc_cache_waiter *ocw;
903
904         ENTRY;
905         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
906                 /* if we can't dirty more, we must wait until some is written */
907                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
908                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
909                     obd_max_dirty_pages)) {
910                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
911                                "osc max %ld, sys max %d\n", cli->cl_dirty,
912                                cli->cl_dirty_max, obd_max_dirty_pages);
913                         return;
914                 }
915
916                 /* if still dirty cache but no grant wait for pending RPCs that
917                  * may yet return us some grant before doing sync writes */
918                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
919                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
920                                cli->cl_w_in_flight);
921                         return;
922                 }
923
924                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
925                 cfs_list_del_init(&ocw->ocw_entry);
926                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
927                         /* no more RPCs in flight to return grant, do sync IO */
928                         ocw->ocw_rc = -EDQUOT;
929                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
930                 } else {
931                         osc_consume_write_grant(cli,
932                                                 &ocw->ocw_oap->oap_brw_page);
933                 }
934
935                 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
936                        ocw, ocw->ocw_oap, cli->cl_avail_grant);
937
938                 cfs_waitq_signal(&ocw->ocw_waitq);
939         }
940
941         EXIT;
942 }
943
944 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
945 {
946         client_obd_list_lock(&cli->cl_loi_list_lock);
947         cli->cl_avail_grant += grant;
948         client_obd_list_unlock(&cli->cl_loi_list_lock);
949 }
950
951 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
952 {
953         if (body->oa.o_valid & OBD_MD_FLGRANT) {
954                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
955                 __osc_update_grant(cli, body->oa.o_grant);
956         }
957 }
958
959 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
960                               obd_count keylen, void *key, obd_count vallen,
961                               void *val, struct ptlrpc_request_set *set);
962
963 static int osc_shrink_grant_interpret(const struct lu_env *env,
964                                       struct ptlrpc_request *req,
965                                       void *aa, int rc)
966 {
967         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
968         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
969         struct ost_body *body;
970
971         if (rc != 0) {
972                 __osc_update_grant(cli, oa->o_grant);
973                 GOTO(out, rc);
974         }
975
976         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
977         LASSERT(body);
978         osc_update_grant(cli, body);
979 out:
980         OBDO_FREE(oa);
981         return rc;
982 }
983
984 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
985 {
986         client_obd_list_lock(&cli->cl_loi_list_lock);
987         oa->o_grant = cli->cl_avail_grant / 4;
988         cli->cl_avail_grant -= oa->o_grant;
989         client_obd_list_unlock(&cli->cl_loi_list_lock);
990         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
991                 oa->o_valid |= OBD_MD_FLFLAGS;
992                 oa->o_flags = 0;
993         }
994         oa->o_flags |= OBD_FL_SHRINK_GRANT;
995         osc_update_next_shrink(cli);
996 }
997
998 /* Shrink the current grant, either from some large amount to enough for a
999  * full set of in-flight RPCs, or if we have already shrunk to that limit
1000  * then to enough for a single RPC.  This avoids keeping more grant than
1001  * needed, and avoids shrinking the grant piecemeal. */
1002 static int osc_shrink_grant(struct client_obd *cli)
1003 {
1004         long target = (cli->cl_max_rpcs_in_flight + 1) *
1005                       cli->cl_max_pages_per_rpc;
1006
1007         client_obd_list_lock(&cli->cl_loi_list_lock);
1008         if (cli->cl_avail_grant <= target)
1009                 target = cli->cl_max_pages_per_rpc;
1010         client_obd_list_unlock(&cli->cl_loi_list_lock);
1011
1012         return osc_shrink_grant_to_target(cli, target);
1013 }
1014
1015 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1016 {
1017         int    rc = 0;
1018         struct ost_body     *body;
1019         ENTRY;
1020
1021         client_obd_list_lock(&cli->cl_loi_list_lock);
1022         /* Don't shrink if we are already above or below the desired limit
1023          * We don't want to shrink below a single RPC, as that will negatively
1024          * impact block allocation and long-term performance. */
1025         if (target < cli->cl_max_pages_per_rpc)
1026                 target = cli->cl_max_pages_per_rpc;
1027
1028         if (target >= cli->cl_avail_grant) {
1029                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1030                 RETURN(0);
1031         }
1032         client_obd_list_unlock(&cli->cl_loi_list_lock);
1033
1034         OBD_ALLOC_PTR(body);
1035         if (!body)
1036                 RETURN(-ENOMEM);
1037
1038         osc_announce_cached(cli, &body->oa, 0);
1039
1040         client_obd_list_lock(&cli->cl_loi_list_lock);
1041         body->oa.o_grant = cli->cl_avail_grant - target;
1042         cli->cl_avail_grant = target;
1043         client_obd_list_unlock(&cli->cl_loi_list_lock);
1044         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1045                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1046                 body->oa.o_flags = 0;
1047         }
1048         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1049         osc_update_next_shrink(cli);
1050
1051         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
1052                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1053                                 sizeof(*body), body, NULL);
1054         if (rc != 0)
1055                 __osc_update_grant(cli, body->oa.o_grant);
1056         OBD_FREE_PTR(body);
1057         RETURN(rc);
1058 }
1059
1060 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1061 static int osc_should_shrink_grant(struct client_obd *client)
1062 {
1063         cfs_time_t time = cfs_time_current();
1064         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1065
1066         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1067              OBD_CONNECT_GRANT_SHRINK) == 0)
1068                 return 0;
1069
1070         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1071                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1072                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1073                         return 1;
1074                 else
1075                         osc_update_next_shrink(client);
1076         }
1077         return 0;
1078 }
1079
1080 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1081 {
1082         struct client_obd *client;
1083
1084         cfs_list_for_each_entry(client, &item->ti_obd_list,
1085                                 cl_grant_shrink_list) {
1086                 if (osc_should_shrink_grant(client))
1087                         osc_shrink_grant(client);
1088         }
1089         return 0;
1090 }
1091
1092 static int osc_add_shrink_grant(struct client_obd *client)
1093 {
1094         int rc;
1095
1096         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1097                                        TIMEOUT_GRANT,
1098                                        osc_grant_shrink_grant_cb, NULL,
1099                                        &client->cl_grant_shrink_list);
1100         if (rc) {
1101                 CERROR("add grant client %s error %d\n",
1102                         client->cl_import->imp_obd->obd_name, rc);
1103                 return rc;
1104         }
1105         CDEBUG(D_CACHE, "add grant client %s \n",
1106                client->cl_import->imp_obd->obd_name);
1107         osc_update_next_shrink(client);
1108         return 0;
1109 }
1110
1111 static int osc_del_shrink_grant(struct client_obd *client)
1112 {
1113         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1114                                          TIMEOUT_GRANT);
1115 }
1116
1117 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1118 {
1119         /*
1120          * ocd_grant is the total grant amount we're expect to hold: if we've
1121          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1122          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1123          *
1124          * race is tolerable here: if we're evicted, but imp_state already
1125          * left EVICTED state, then cl_dirty must be 0 already.
1126          */
1127         client_obd_list_lock(&cli->cl_loi_list_lock);
1128         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1129                 cli->cl_avail_grant = ocd->ocd_grant;
1130         else
1131                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1132
1133         if (cli->cl_avail_grant < 0) {
1134                 CWARN("%s: available grant < 0, the OSS is probably not running"
1135                       " with patch from bug20278 (%ld) \n",
1136                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1137                 /* workaround for 1.6 servers which do not have
1138                  * the patch from bug20278 */
1139                 cli->cl_avail_grant = ocd->ocd_grant;
1140         }
1141
1142         client_obd_list_unlock(&cli->cl_loi_list_lock);
1143
1144         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1145                cli->cl_import->imp_obd->obd_name,
1146                cli->cl_avail_grant, cli->cl_lost_grant);
1147
1148         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1149             cfs_list_empty(&cli->cl_grant_shrink_list))
1150                 osc_add_shrink_grant(cli);
1151 }
1152
1153 /* We assume that the reason this OSC got a short read is because it read
1154  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1155  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1156  * this stripe never got written at or beyond this stripe offset yet. */
1157 static void handle_short_read(int nob_read, obd_count page_count,
1158                               struct brw_page **pga)
1159 {
1160         char *ptr;
1161         int i = 0;
1162
1163         /* skip bytes read OK */
1164         while (nob_read > 0) {
1165                 LASSERT (page_count > 0);
1166
1167                 if (pga[i]->count > nob_read) {
1168                         /* EOF inside this page */
1169                         ptr = cfs_kmap(pga[i]->pg) +
1170                                 (pga[i]->off & ~CFS_PAGE_MASK);
1171                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1172                         cfs_kunmap(pga[i]->pg);
1173                         page_count--;
1174                         i++;
1175                         break;
1176                 }
1177
1178                 nob_read -= pga[i]->count;
1179                 page_count--;
1180                 i++;
1181         }
1182
1183         /* zero remaining pages */
1184         while (page_count-- > 0) {
1185                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1186                 memset(ptr, 0, pga[i]->count);
1187                 cfs_kunmap(pga[i]->pg);
1188                 i++;
1189         }
1190 }
1191
1192 static int check_write_rcs(struct ptlrpc_request *req,
1193                            int requested_nob, int niocount,
1194                            obd_count page_count, struct brw_page **pga)
1195 {
1196         int     i;
1197         __u32   *remote_rcs;
1198
1199         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1200                                                   sizeof(*remote_rcs) *
1201                                                   niocount);
1202         if (remote_rcs == NULL) {
1203                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1204                 return(-EPROTO);
1205         }
1206
1207         /* return error if any niobuf was in error */
1208         for (i = 0; i < niocount; i++) {
1209                 if ((int)remote_rcs[i] < 0)
1210                         return(remote_rcs[i]);
1211
1212                 if (remote_rcs[i] != 0) {
1213                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1214                                 i, remote_rcs[i], req);
1215                         return(-EPROTO);
1216                 }
1217         }
1218
1219         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1220                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1221                        req->rq_bulk->bd_nob_transferred, requested_nob);
1222                 return(-EPROTO);
1223         }
1224
1225         return (0);
1226 }
1227
1228 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1229 {
1230         if (p1->flag != p2->flag) {
1231                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1232                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1233
1234                 /* warn if we try to combine flags that we don't know to be
1235                  * safe to combine */
1236                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1237                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1238                               "report this at http://bugs.whamcloud.com/\n",
1239                               p1->flag, p2->flag);
1240                 }
1241                 return 0;
1242         }
1243
1244         return (p1->off + p1->count == p2->off);
1245 }
1246
1247 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1248                                    struct brw_page **pga, int opc,
1249                                    cksum_type_t cksum_type)
1250 {
1251         __u32 cksum;
1252         int i = 0;
1253
1254         LASSERT (pg_count > 0);
1255         cksum = init_checksum(cksum_type);
1256         while (nob > 0 && pg_count > 0) {
1257                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1258                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1259                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1260
1261                 /* corrupt the data before we compute the checksum, to
1262                  * simulate an OST->client data error */
1263                 if (i == 0 && opc == OST_READ &&
1264                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1265                         memcpy(ptr + off, "bad1", min(4, nob));
1266                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1267                 cfs_kunmap(pga[i]->pg);
1268                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1269                                off, cksum);
1270
1271                 nob -= pga[i]->count;
1272                 pg_count--;
1273                 i++;
1274         }
1275         /* For sending we only compute the wrong checksum instead
1276          * of corrupting the data so it is still correct on a redo */
1277         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1278                 cksum++;
1279
1280         return fini_checksum(cksum, cksum_type);
1281 }
1282
1283 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1284                                 struct lov_stripe_md *lsm, obd_count page_count,
1285                                 struct brw_page **pga,
1286                                 struct ptlrpc_request **reqp,
1287                                 struct obd_capa *ocapa, int reserve,
1288                                 int resend)
1289 {
1290         struct ptlrpc_request   *req;
1291         struct ptlrpc_bulk_desc *desc;
1292         struct ost_body         *body;
1293         struct obd_ioobj        *ioobj;
1294         struct niobuf_remote    *niobuf;
1295         int niocount, i, requested_nob, opc, rc;
1296         struct osc_brw_async_args *aa;
1297         struct req_capsule      *pill;
1298         struct brw_page *pg_prev;
1299
1300         ENTRY;
1301         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1302                 RETURN(-ENOMEM); /* Recoverable */
1303         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1304                 RETURN(-EINVAL); /* Fatal */
1305
1306         if ((cmd & OBD_BRW_WRITE) != 0) {
1307                 opc = OST_WRITE;
1308                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1309                                                 cli->cl_import->imp_rq_pool,
1310                                                 &RQF_OST_BRW_WRITE);
1311         } else {
1312                 opc = OST_READ;
1313                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1314         }
1315         if (req == NULL)
1316                 RETURN(-ENOMEM);
1317
1318         for (niocount = i = 1; i < page_count; i++) {
1319                 if (!can_merge_pages(pga[i - 1], pga[i]))
1320                         niocount++;
1321         }
1322
1323         pill = &req->rq_pill;
1324         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1325                              sizeof(*ioobj));
1326         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1327                              niocount * sizeof(*niobuf));
1328         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1329
1330         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1331         if (rc) {
1332                 ptlrpc_request_free(req);
1333                 RETURN(rc);
1334         }
1335         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1336         ptlrpc_at_set_req_timeout(req);
1337
1338         if (opc == OST_WRITE)
1339                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1340                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1341         else
1342                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1343                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1344
1345         if (desc == NULL)
1346                 GOTO(out, rc = -ENOMEM);
1347         /* NB request now owns desc and will free it when it gets freed */
1348
1349         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1350         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1351         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1352         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1353
1354         lustre_set_wire_obdo(&body->oa, oa);
1355
1356         obdo_to_ioobj(oa, ioobj);
1357         ioobj->ioo_bufcnt = niocount;
1358         osc_pack_capa(req, body, ocapa);
1359         LASSERT (page_count > 0);
1360         pg_prev = pga[0];
1361         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1362                 struct brw_page *pg = pga[i];
1363                 int poff = pg->off & ~CFS_PAGE_MASK;
1364
1365                 LASSERT(pg->count > 0);
1366                 /* make sure there is no gap in the middle of page array */
1367                 LASSERTF(page_count == 1 ||
1368                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1369                           ergo(i > 0 && i < page_count - 1,
1370                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1371                           ergo(i == page_count - 1, poff == 0)),
1372                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1373                          i, page_count, pg, pg->off, pg->count);
1374 #ifdef __linux__
1375                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1376                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1377                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1378                          i, page_count,
1379                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1380                          pg_prev->pg, page_private(pg_prev->pg),
1381                          pg_prev->pg->index, pg_prev->off);
1382 #else
1383                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1384                          "i %d p_c %u\n", i, page_count);
1385 #endif
1386                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1387                         (pg->flag & OBD_BRW_SRVLOCK));
1388
1389                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1390                 requested_nob += pg->count;
1391
1392                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1393                         niobuf--;
1394                         niobuf->len += pg->count;
1395                 } else {
1396                         niobuf->offset = pg->off;
1397                         niobuf->len    = pg->count;
1398                         niobuf->flags  = pg->flag;
1399                 }
1400                 pg_prev = pg;
1401         }
1402
1403         LASSERTF((void *)(niobuf - niocount) ==
1404                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1405                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1406                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1407
1408         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1409         if (resend) {
1410                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1411                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1412                         body->oa.o_flags = 0;
1413                 }
1414                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1415         }
1416
1417         if (osc_should_shrink_grant(cli))
1418                 osc_shrink_grant_local(cli, &body->oa);
1419
1420         /* size[REQ_REC_OFF] still sizeof (*body) */
1421         if (opc == OST_WRITE) {
1422                 if (cli->cl_checksum &&
1423                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1424                         /* store cl_cksum_type in a local variable since
1425                          * it can be changed via lprocfs */
1426                         cksum_type_t cksum_type = cli->cl_cksum_type;
1427
1428                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1429                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1430                                 body->oa.o_flags = 0;
1431                         }
1432                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1433                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1434                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1435                                                              page_count, pga,
1436                                                              OST_WRITE,
1437                                                              cksum_type);
1438                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1439                                body->oa.o_cksum);
1440                         /* save this in 'oa', too, for later checking */
1441                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1442                         oa->o_flags |= cksum_type_pack(cksum_type);
1443                 } else {
1444                         /* clear out the checksum flag, in case this is a
1445                          * resend but cl_checksum is no longer set. b=11238 */
1446                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1447                 }
1448                 oa->o_cksum = body->oa.o_cksum;
1449                 /* 1 RC per niobuf */
1450                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1451                                      sizeof(__u32) * niocount);
1452         } else {
1453                 if (cli->cl_checksum &&
1454                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1455                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1456                                 body->oa.o_flags = 0;
1457                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1458                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1459                 }
1460         }
1461         ptlrpc_request_set_replen(req);
1462
1463         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1464         aa = ptlrpc_req_async_args(req);
1465         aa->aa_oa = oa;
1466         aa->aa_requested_nob = requested_nob;
1467         aa->aa_nio_count = niocount;
1468         aa->aa_page_count = page_count;
1469         aa->aa_resends = 0;
1470         aa->aa_ppga = pga;
1471         aa->aa_cli = cli;
1472         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1473         if (ocapa && reserve)
1474                 aa->aa_ocapa = capa_get(ocapa);
1475
1476         *reqp = req;
1477         RETURN(0);
1478
1479  out:
1480         ptlrpc_req_finished(req);
1481         RETURN(rc);
1482 }
1483
1484 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1485                                 __u32 client_cksum, __u32 server_cksum, int nob,
1486                                 obd_count page_count, struct brw_page **pga,
1487                                 cksum_type_t client_cksum_type)
1488 {
1489         __u32 new_cksum;
1490         char *msg;
1491         cksum_type_t cksum_type;
1492
1493         if (server_cksum == client_cksum) {
1494                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1495                 return 0;
1496         }
1497
1498         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1499                                        oa->o_flags : 0);
1500         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1501                                       cksum_type);
1502
1503         if (cksum_type != client_cksum_type)
1504                 msg = "the server did not use the checksum type specified in "
1505                       "the original request - likely a protocol problem";
1506         else if (new_cksum == server_cksum)
1507                 msg = "changed on the client after we checksummed it - "
1508                       "likely false positive due to mmap IO (bug 11742)";
1509         else if (new_cksum == client_cksum)
1510                 msg = "changed in transit before arrival at OST";
1511         else
1512                 msg = "changed in transit AND doesn't match the original - "
1513                       "likely false positive due to mmap IO (bug 11742)";
1514
1515         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1516                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1517                            msg, libcfs_nid2str(peer->nid),
1518                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1519                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1520                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1521                            oa->o_id,
1522                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1523                            pga[0]->off,
1524                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1525         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1526                "client csum now %x\n", client_cksum, client_cksum_type,
1527                server_cksum, cksum_type, new_cksum);
1528         return 1;
1529 }
1530
1531 /* Note rc enters this function as number of bytes transferred */
1532 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1533 {
1534         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1535         const lnet_process_id_t *peer =
1536                         &req->rq_import->imp_connection->c_peer;
1537         struct client_obd *cli = aa->aa_cli;
1538         struct ost_body *body;
1539         __u32 client_cksum = 0;
1540         ENTRY;
1541
1542         if (rc < 0 && rc != -EDQUOT) {
1543                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1544                 RETURN(rc);
1545         }
1546
1547         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1548         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1549         if (body == NULL) {
1550                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1551                 RETURN(-EPROTO);
1552         }
1553
1554         /* set/clear over quota flag for a uid/gid */
1555         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1556             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1557                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1558
1559                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1560                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1561                        body->oa.o_flags);
1562                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1563         }
1564
1565         osc_update_grant(cli, body);
1566
1567         if (rc < 0)
1568                 RETURN(rc);
1569
1570         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1571                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1572
1573         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1574                 if (rc > 0) {
1575                         CERROR("Unexpected +ve rc %d\n", rc);
1576                         RETURN(-EPROTO);
1577                 }
1578                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1579
1580                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1581                         RETURN(-EAGAIN);
1582
1583                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1584                     check_write_checksum(&body->oa, peer, client_cksum,
1585                                          body->oa.o_cksum, aa->aa_requested_nob,
1586                                          aa->aa_page_count, aa->aa_ppga,
1587                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1588                         RETURN(-EAGAIN);
1589
1590                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1591                                      aa->aa_page_count, aa->aa_ppga);
1592                 GOTO(out, rc);
1593         }
1594
1595         /* The rest of this function executes only for OST_READs */
1596
1597         /* if unwrap_bulk failed, return -EAGAIN to retry */
1598         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1599         if (rc < 0)
1600                 GOTO(out, rc = -EAGAIN);
1601
1602         if (rc > aa->aa_requested_nob) {
1603                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1604                        aa->aa_requested_nob);
1605                 RETURN(-EPROTO);
1606         }
1607
1608         if (rc != req->rq_bulk->bd_nob_transferred) {
1609                 CERROR ("Unexpected rc %d (%d transferred)\n",
1610                         rc, req->rq_bulk->bd_nob_transferred);
1611                 return (-EPROTO);
1612         }
1613
1614         if (rc < aa->aa_requested_nob)
1615                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1616
1617         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1618                 static int cksum_counter;
1619                 __u32      server_cksum = body->oa.o_cksum;
1620                 char      *via;
1621                 char      *router;
1622                 cksum_type_t cksum_type;
1623
1624                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1625                                                body->oa.o_flags : 0);
1626                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1627                                                  aa->aa_ppga, OST_READ,
1628                                                  cksum_type);
1629
1630                 if (peer->nid == req->rq_bulk->bd_sender) {
1631                         via = router = "";
1632                 } else {
1633                         via = " via ";
1634                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1635                 }
1636
1637                 if (server_cksum == ~0 && rc > 0) {
1638                         CERROR("Protocol error: server %s set the 'checksum' "
1639                                "bit, but didn't send a checksum.  Not fatal, "
1640                                "but please notify on http://bugs.whamcloud.com/\n",
1641                                libcfs_nid2str(peer->nid));
1642                 } else if (server_cksum != client_cksum) {
1643                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1644                                            "%s%s%s inode "DFID" object "
1645                                            LPU64"/"LPU64" extent "
1646                                            "["LPU64"-"LPU64"]\n",
1647                                            req->rq_import->imp_obd->obd_name,
1648                                            libcfs_nid2str(peer->nid),
1649                                            via, router,
1650                                            body->oa.o_valid & OBD_MD_FLFID ?
1651                                                 body->oa.o_parent_seq : (__u64)0,
1652                                            body->oa.o_valid & OBD_MD_FLFID ?
1653                                                 body->oa.o_parent_oid : 0,
1654                                            body->oa.o_valid & OBD_MD_FLFID ?
1655                                                 body->oa.o_parent_ver : 0,
1656                                            body->oa.o_id,
1657                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1658                                                 body->oa.o_seq : (__u64)0,
1659                                            aa->aa_ppga[0]->off,
1660                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1661                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1662                                                                         1);
1663                         CERROR("client %x, server %x, cksum_type %x\n",
1664                                client_cksum, server_cksum, cksum_type);
1665                         cksum_counter = 0;
1666                         aa->aa_oa->o_cksum = client_cksum;
1667                         rc = -EAGAIN;
1668                 } else {
1669                         cksum_counter++;
1670                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1671                         rc = 0;
1672                 }
1673         } else if (unlikely(client_cksum)) {
1674                 static int cksum_missed;
1675
1676                 cksum_missed++;
1677                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1678                         CERROR("Checksum %u requested from %s but not sent\n",
1679                                cksum_missed, libcfs_nid2str(peer->nid));
1680         } else {
1681                 rc = 0;
1682         }
1683 out:
1684         if (rc >= 0)
1685                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1686
1687         RETURN(rc);
1688 }
1689
1690 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1691                             struct lov_stripe_md *lsm,
1692                             obd_count page_count, struct brw_page **pga,
1693                             struct obd_capa *ocapa)
1694 {
1695         struct ptlrpc_request *req;
1696         int                    rc;
1697         cfs_waitq_t            waitq;
1698         int                    generation, resends = 0;
1699         struct l_wait_info     lwi;
1700
1701         ENTRY;
1702
1703         cfs_waitq_init(&waitq);
1704         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1705
1706 restart_bulk:
1707         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1708                                   page_count, pga, &req, ocapa, 0, resends);
1709         if (rc != 0)
1710                 return (rc);
1711
1712         if (resends) {
1713                 req->rq_generation_set = 1;
1714                 req->rq_import_generation = generation;
1715                 req->rq_sent = cfs_time_current_sec() + resends;
1716         }
1717
1718         rc = ptlrpc_queue_wait(req);
1719
1720         if (rc == -ETIMEDOUT && req->rq_resend) {
1721                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1722                 ptlrpc_req_finished(req);
1723                 goto restart_bulk;
1724         }
1725
1726         rc = osc_brw_fini_request(req, rc);
1727
1728         ptlrpc_req_finished(req);
1729         /* When server return -EINPROGRESS, client should always retry
1730          * regardless of the number of times the bulk was resent already.*/
1731         if (osc_recoverable_error(rc)) {
1732                 resends++;
1733                 if (rc != -EINPROGRESS &&
1734                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1735                         CERROR("%s: too many resend retries for object: "
1736                                ""LPU64":"LPU64", rc = %d.\n",
1737                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1738                         goto out;
1739                 }
1740                 if (generation !=
1741                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1742                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1743                                ""LPU64":"LPU64", rc = %d.\n",
1744                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1745                         goto out;
1746                 }
1747
1748                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1749                                        NULL);
1750                 l_wait_event(waitq, 0, &lwi);
1751
1752                 goto restart_bulk;
1753         }
1754 out:
1755         if (rc == -EAGAIN || rc == -EINPROGRESS)
1756                 rc = -EIO;
1757         RETURN (rc);
1758 }
1759
1760 int osc_brw_redo_request(struct ptlrpc_request *request,
1761                          struct osc_brw_async_args *aa)
1762 {
1763         struct ptlrpc_request *new_req;
1764         struct ptlrpc_request_set *set = request->rq_set;
1765         struct osc_brw_async_args *new_aa;
1766         struct osc_async_page *oap;
1767         int rc = 0;
1768         ENTRY;
1769
1770         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1771
1772         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1773                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1774                                   aa->aa_cli, aa->aa_oa,
1775                                   NULL /* lsm unused by osc currently */,
1776                                   aa->aa_page_count, aa->aa_ppga,
1777                                   &new_req, aa->aa_ocapa, 0, 1);
1778         if (rc)
1779                 RETURN(rc);
1780
1781         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1782
1783         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1784                 if (oap->oap_request != NULL) {
1785                         LASSERTF(request == oap->oap_request,
1786                                  "request %p != oap_request %p\n",
1787                                  request, oap->oap_request);
1788                         if (oap->oap_interrupted) {
1789                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1790                                 ptlrpc_req_finished(new_req);
1791                                 RETURN(-EINTR);
1792                         }
1793                 }
1794         }
1795         /* New request takes over pga and oaps from old request.
1796          * Note that copying a list_head doesn't work, need to move it... */
1797         aa->aa_resends++;
1798         new_req->rq_interpret_reply = request->rq_interpret_reply;
1799         new_req->rq_async_args = request->rq_async_args;
1800         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1801         new_req->rq_generation_set = 1;
1802         new_req->rq_import_generation = request->rq_import_generation;
1803
1804         new_aa = ptlrpc_req_async_args(new_req);
1805
1806         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1807         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1808         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1809
1810         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1811                 if (oap->oap_request) {
1812                         ptlrpc_req_finished(oap->oap_request);
1813                         oap->oap_request = ptlrpc_request_addref(new_req);
1814                 }
1815         }
1816
1817         new_aa->aa_ocapa = aa->aa_ocapa;
1818         aa->aa_ocapa = NULL;
1819
1820         /* use ptlrpc_set_add_req is safe because interpret functions work
1821          * in check_set context. only one way exist with access to request
1822          * from different thread got -EINTR - this way protected with
1823          * cl_loi_list_lock */
1824         ptlrpc_set_add_req(set, new_req);
1825
1826         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1827
1828         DEBUG_REQ(D_INFO, new_req, "new request");
1829         RETURN(0);
1830 }
1831
1832 /*
1833  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1834  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1835  * fine for our small page arrays and doesn't require allocation.  its an
1836  * insertion sort that swaps elements that are strides apart, shrinking the
1837  * stride down until its '1' and the array is sorted.
1838  */
1839 static void sort_brw_pages(struct brw_page **array, int num)
1840 {
1841         int stride, i, j;
1842         struct brw_page *tmp;
1843
1844         if (num == 1)
1845                 return;
1846         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1847                 ;
1848
1849         do {
1850                 stride /= 3;
1851                 for (i = stride ; i < num ; i++) {
1852                         tmp = array[i];
1853                         j = i;
1854                         while (j >= stride && array[j - stride]->off > tmp->off) {
1855                                 array[j] = array[j - stride];
1856                                 j -= stride;
1857                         }
1858                         array[j] = tmp;
1859                 }
1860         } while (stride > 1);
1861 }
1862
1863 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1864 {
1865         int count = 1;
1866         int offset;
1867         int i = 0;
1868
1869         LASSERT (pages > 0);
1870         offset = pg[i]->off & ~CFS_PAGE_MASK;
1871
1872         for (;;) {
1873                 pages--;
1874                 if (pages == 0)         /* that's all */
1875                         return count;
1876
1877                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1878                         return count;   /* doesn't end on page boundary */
1879
1880                 i++;
1881                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1882                 if (offset != 0)        /* doesn't start on page boundary */
1883                         return count;
1884
1885                 count++;
1886         }
1887 }
1888
1889 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1890 {
1891         struct brw_page **ppga;
1892         int i;
1893
1894         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1895         if (ppga == NULL)
1896                 return NULL;
1897
1898         for (i = 0; i < count; i++)
1899                 ppga[i] = pga + i;
1900         return ppga;
1901 }
1902
1903 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1904 {
1905         LASSERT(ppga != NULL);
1906         OBD_FREE(ppga, sizeof(*ppga) * count);
1907 }
1908
1909 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1910                    obd_count page_count, struct brw_page *pga,
1911                    struct obd_trans_info *oti)
1912 {
1913         struct obdo *saved_oa = NULL;
1914         struct brw_page **ppga, **orig;
1915         struct obd_import *imp = class_exp2cliimp(exp);
1916         struct client_obd *cli;
1917         int rc, page_count_orig;
1918         ENTRY;
1919
1920         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1921         cli = &imp->imp_obd->u.cli;
1922
1923         if (cmd & OBD_BRW_CHECK) {
1924                 /* The caller just wants to know if there's a chance that this
1925                  * I/O can succeed */
1926
1927                 if (imp->imp_invalid)
1928                         RETURN(-EIO);
1929                 RETURN(0);
1930         }
1931
1932         /* test_brw with a failed create can trip this, maybe others. */
1933         LASSERT(cli->cl_max_pages_per_rpc);
1934
1935         rc = 0;
1936
1937         orig = ppga = osc_build_ppga(pga, page_count);
1938         if (ppga == NULL)
1939                 RETURN(-ENOMEM);
1940         page_count_orig = page_count;
1941
1942         sort_brw_pages(ppga, page_count);
1943         while (page_count) {
1944                 obd_count pages_per_brw;
1945
1946                 if (page_count > cli->cl_max_pages_per_rpc)
1947                         pages_per_brw = cli->cl_max_pages_per_rpc;
1948                 else
1949                         pages_per_brw = page_count;
1950
1951                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1952
1953                 if (saved_oa != NULL) {
1954                         /* restore previously saved oa */
1955                         *oinfo->oi_oa = *saved_oa;
1956                 } else if (page_count > pages_per_brw) {
1957                         /* save a copy of oa (brw will clobber it) */
1958                         OBDO_ALLOC(saved_oa);
1959                         if (saved_oa == NULL)
1960                                 GOTO(out, rc = -ENOMEM);
1961                         *saved_oa = *oinfo->oi_oa;
1962                 }
1963
1964                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1965                                       pages_per_brw, ppga, oinfo->oi_capa);
1966
1967                 if (rc != 0)
1968                         break;
1969
1970                 page_count -= pages_per_brw;
1971                 ppga += pages_per_brw;
1972         }
1973
1974 out:
1975         osc_release_ppga(orig, page_count_orig);
1976
1977         if (saved_oa != NULL)
1978                 OBDO_FREE(saved_oa);
1979
1980         RETURN(rc);
1981 }
1982
1983 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1984  * the dirty accounting.  Writeback completes or truncate happens before
1985  * writing starts.  Must be called with the loi lock held. */
1986 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1987                            int sent)
1988 {
1989         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1990 }
1991
1992
1993 /* This maintains the lists of pending pages to read/write for a given object
1994  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1995  * to quickly find objects that are ready to send an RPC. */
1996 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1997                          int cmd)
1998 {
1999         ENTRY;
2000
2001         if (lop->lop_num_pending == 0)
2002                 RETURN(0);
2003
2004         /* if we have an invalid import we want to drain the queued pages
2005          * by forcing them through rpcs that immediately fail and complete
2006          * the pages.  recovery relies on this to empty the queued pages
2007          * before canceling the locks and evicting down the llite pages */
2008         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2009                 RETURN(1);
2010
2011         /* stream rpcs in queue order as long as as there is an urgent page
2012          * queued.  this is our cheap solution for good batching in the case
2013          * where writepage marks some random page in the middle of the file
2014          * as urgent because of, say, memory pressure */
2015         if (!cfs_list_empty(&lop->lop_urgent)) {
2016                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2017                 RETURN(1);
2018         }
2019
2020         if (cmd & OBD_BRW_WRITE) {
2021                 /* trigger a write rpc stream as long as there are dirtiers
2022                  * waiting for space.  as they're waiting, they're not going to
2023                  * create more pages to coalesce with what's waiting.. */
2024                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2025                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2026                         RETURN(1);
2027                 }
2028         }
2029         if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2030                 RETURN(1);
2031
2032         RETURN(0);
2033 }
2034
2035 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2036 {
2037         struct osc_async_page *oap;
2038         ENTRY;
2039
2040         if (cfs_list_empty(&lop->lop_urgent))
2041                 RETURN(0);
2042
2043         oap = cfs_list_entry(lop->lop_urgent.next,
2044                          struct osc_async_page, oap_urgent_item);
2045
2046         if (oap->oap_async_flags & ASYNC_HP) {
2047                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2048                 RETURN(1);
2049         }
2050
2051         RETURN(0);
2052 }
2053
2054 static void on_list(cfs_list_t *item, cfs_list_t *list,
2055                     int should_be_on)
2056 {
2057         if (cfs_list_empty(item) && should_be_on)
2058                 cfs_list_add_tail(item, list);
2059         else if (!cfs_list_empty(item) && !should_be_on)
2060                 cfs_list_del_init(item);
2061 }
2062
2063 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2064  * can find pages to build into rpcs quickly */
2065 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2066 {
2067         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2068             lop_makes_hprpc(&loi->loi_read_lop)) {
2069                 /* HP rpc */
2070                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2071                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2072         } else {
2073                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2074                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2075                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2076                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2077         }
2078
2079         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2080                 loi->loi_write_lop.lop_num_pending);
2081
2082         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2083                 loi->loi_read_lop.lop_num_pending);
2084 }
2085
2086 static void lop_update_pending(struct client_obd *cli,
2087                                struct loi_oap_pages *lop, int cmd, int delta)
2088 {
2089         lop->lop_num_pending += delta;
2090         if (cmd & OBD_BRW_WRITE)
2091                 cli->cl_pending_w_pages += delta;
2092         else
2093                 cli->cl_pending_r_pages += delta;
2094 }
2095
2096 /**
2097  * this is called when a sync waiter receives an interruption.  Its job is to
2098  * get the caller woken as soon as possible.  If its page hasn't been put in an
2099  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2100  * desiring interruption which will forcefully complete the rpc once the rpc
2101  * has timed out.
2102  */
2103 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2104 {
2105         struct loi_oap_pages *lop;
2106         struct lov_oinfo *loi;
2107         int rc = -EBUSY;
2108         ENTRY;
2109
2110         LASSERT(!oap->oap_interrupted);
2111         oap->oap_interrupted = 1;
2112
2113         /* ok, it's been put in an rpc. only one oap gets a request reference */
2114         if (oap->oap_request != NULL) {
2115                 ptlrpc_mark_interrupted(oap->oap_request);
2116                 ptlrpcd_wake(oap->oap_request);
2117                 ptlrpc_req_finished(oap->oap_request);
2118                 oap->oap_request = NULL;
2119         }
2120
2121         /*
2122          * page completion may be called only if ->cpo_prep() method was
2123          * executed by osc_io_submit(), that also adds page the to pending list
2124          */
2125         if (!cfs_list_empty(&oap->oap_pending_item)) {
2126                 cfs_list_del_init(&oap->oap_pending_item);
2127                 cfs_list_del_init(&oap->oap_urgent_item);
2128
2129                 loi = oap->oap_loi;
2130                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2131                         &loi->loi_write_lop : &loi->loi_read_lop;
2132                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2133                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2134                 rc = oap->oap_caller_ops->ap_completion(env,
2135                                           oap->oap_caller_data,
2136                                           oap->oap_cmd, NULL, -EINTR);
2137         }
2138
2139         RETURN(rc);
2140 }
2141
2142 /* this is trying to propogate async writeback errors back up to the
2143  * application.  As an async write fails we record the error code for later if
2144  * the app does an fsync.  As long as errors persist we force future rpcs to be
2145  * sync so that the app can get a sync error and break the cycle of queueing
2146  * pages for which writeback will fail. */
2147 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2148                            int rc)
2149 {
2150         if (rc) {
2151                 if (!ar->ar_rc)
2152                         ar->ar_rc = rc;
2153
2154                 ar->ar_force_sync = 1;
2155                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2156                 return;
2157
2158         }
2159
2160         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2161                 ar->ar_force_sync = 0;
2162 }
2163
2164 void osc_oap_to_pending(struct osc_async_page *oap)
2165 {
2166         struct loi_oap_pages *lop;
2167
2168         if (oap->oap_cmd & OBD_BRW_WRITE)
2169                 lop = &oap->oap_loi->loi_write_lop;
2170         else
2171                 lop = &oap->oap_loi->loi_read_lop;
2172
2173         if (oap->oap_async_flags & ASYNC_HP)
2174                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2175         else if (oap->oap_async_flags & ASYNC_URGENT)
2176                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2177         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2178         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2179 }
2180
2181 /* this must be called holding the loi list lock to give coverage to exit_cache,
2182  * async_flag maintenance, and oap_request */
2183 static void osc_ap_completion(const struct lu_env *env,
2184                               struct client_obd *cli, struct obdo *oa,
2185                               struct osc_async_page *oap, int sent, int rc)
2186 {
2187         __u64 xid = 0;
2188
2189         ENTRY;
2190         if (oap->oap_request != NULL) {
2191                 xid = ptlrpc_req_xid(oap->oap_request);
2192                 ptlrpc_req_finished(oap->oap_request);
2193                 oap->oap_request = NULL;
2194         }
2195
2196         cfs_spin_lock(&oap->oap_lock);
2197         oap->oap_async_flags = 0;
2198         cfs_spin_unlock(&oap->oap_lock);
2199         oap->oap_interrupted = 0;
2200
2201         if (oap->oap_cmd & OBD_BRW_WRITE) {
2202                 osc_process_ar(&cli->cl_ar, xid, rc);
2203                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2204         }
2205
2206         if (rc == 0 && oa != NULL) {
2207                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2208                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2209                 if (oa->o_valid & OBD_MD_FLMTIME)
2210                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2211                 if (oa->o_valid & OBD_MD_FLATIME)
2212                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2213                 if (oa->o_valid & OBD_MD_FLCTIME)
2214                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2215         }
2216
2217         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2218                                                 oap->oap_cmd, oa, rc);
2219
2220         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2221          * start, but OSC calls it under lock and thus we can add oap back to
2222          * pending safely */
2223         if (rc)
2224                 /* upper layer wants to leave the page on pending queue */
2225                 osc_oap_to_pending(oap);
2226         else
2227                 osc_exit_cache(cli, oap, sent);
2228         EXIT;
2229 }
2230
2231 static int brw_queue_work(const struct lu_env *env, void *data)
2232 {
2233         struct client_obd *cli = data;
2234
2235         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2236
2237         client_obd_list_lock(&cli->cl_loi_list_lock);
2238         osc_check_rpcs0(env, cli, 1);
2239         client_obd_list_unlock(&cli->cl_loi_list_lock);
2240         RETURN(0);
2241 }
2242
2243 static int brw_interpret(const struct lu_env *env,
2244                          struct ptlrpc_request *req, void *data, int rc)
2245 {
2246         struct osc_brw_async_args *aa = data;
2247         struct client_obd *cli;
2248         int async;
2249         ENTRY;
2250
2251         rc = osc_brw_fini_request(req, rc);
2252         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2253         /* When server return -EINPROGRESS, client should always retry
2254          * regardless of the number of times the bulk was resent already. */
2255         if (osc_recoverable_error(rc)) {
2256                 if (req->rq_import_generation !=
2257                     req->rq_import->imp_generation) {
2258                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2259                                ""LPU64":"LPU64", rc = %d.\n",
2260                                req->rq_import->imp_obd->obd_name,
2261                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2262                 } else if (rc == -EINPROGRESS ||
2263                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2264                         rc = osc_brw_redo_request(req, aa);
2265                 } else {
2266                         CERROR("%s: too many resent retries for object: "
2267                                ""LPU64":"LPU64", rc = %d.\n",
2268                                req->rq_import->imp_obd->obd_name,
2269                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2270                 }
2271
2272                 if (rc == 0)
2273                         RETURN(0);
2274                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2275                         rc = -EIO;
2276         }
2277
2278         if (aa->aa_ocapa) {
2279                 capa_put(aa->aa_ocapa);
2280                 aa->aa_ocapa = NULL;
2281         }
2282
2283         cli = aa->aa_cli;
2284         client_obd_list_lock(&cli->cl_loi_list_lock);
2285
2286         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2287          * is called so we know whether to go to sync BRWs or wait for more
2288          * RPCs to complete */
2289         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2290                 cli->cl_w_in_flight--;
2291         else
2292                 cli->cl_r_in_flight--;
2293
2294         async = cfs_list_empty(&aa->aa_oaps);
2295         if (!async) { /* from osc_send_oap_rpc() */
2296                 struct osc_async_page *oap, *tmp;
2297                 /* the caller may re-use the oap after the completion call so
2298                  * we need to clean it up a little */
2299                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2300                                              oap_rpc_item) {
2301                         cfs_list_del_init(&oap->oap_rpc_item);
2302                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2303                 }
2304                 OBDO_FREE(aa->aa_oa);
2305         } else { /* from async_internal() */
2306                 obd_count i;
2307                 for (i = 0; i < aa->aa_page_count; i++)
2308                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2309         }
2310         osc_wake_cache_waiters(cli);
2311         osc_check_rpcs0(env, cli, 1);
2312         client_obd_list_unlock(&cli->cl_loi_list_lock);
2313
2314         if (!async)
2315                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2316                                   req->rq_bulk->bd_nob_transferred);
2317         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2318         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2319
2320         RETURN(rc);
2321 }
2322
2323 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2324                                             struct client_obd *cli,
2325                                             cfs_list_t *rpc_list,
2326                                             int page_count, int cmd)
2327 {
2328         struct ptlrpc_request *req;
2329         struct brw_page **pga = NULL;
2330         struct osc_brw_async_args *aa;
2331         struct obdo *oa = NULL;
2332         const struct obd_async_page_ops *ops = NULL;
2333         struct osc_async_page *oap;
2334         struct osc_async_page *tmp;
2335         struct cl_req *clerq = NULL;
2336         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2337         struct ldlm_lock *lock = NULL;
2338         struct cl_req_attr crattr;
2339         int i, rc, mpflag = 0;
2340
2341         ENTRY;
2342         LASSERT(!cfs_list_empty(rpc_list));
2343
2344         if (cmd & OBD_BRW_MEMALLOC)
2345                 mpflag = cfs_memory_pressure_get_and_set();
2346
2347         memset(&crattr, 0, sizeof crattr);
2348         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2349         if (pga == NULL)
2350                 GOTO(out, req = ERR_PTR(-ENOMEM));
2351
2352         OBDO_ALLOC(oa);
2353         if (oa == NULL)
2354                 GOTO(out, req = ERR_PTR(-ENOMEM));
2355
2356         i = 0;
2357         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2358                 struct cl_page *page = osc_oap2cl_page(oap);
2359                 if (ops == NULL) {
2360                         ops = oap->oap_caller_ops;
2361
2362                         clerq = cl_req_alloc(env, page, crt,
2363                                              1 /* only 1-object rpcs for
2364                                                 * now */);
2365                         if (IS_ERR(clerq))
2366                                 GOTO(out, req = (void *)clerq);
2367                         lock = oap->oap_ldlm_lock;
2368                 }
2369                 pga[i] = &oap->oap_brw_page;
2370                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2371                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2372                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2373                 i++;
2374                 cl_req_page_add(env, clerq, page);
2375         }
2376
2377         /* always get the data for the obdo for the rpc */
2378         LASSERT(ops != NULL);
2379         crattr.cra_oa = oa;
2380         crattr.cra_capa = NULL;
2381         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2382         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2383         if (lock) {
2384                 oa->o_handle = lock->l_remote_handle;
2385                 oa->o_valid |= OBD_MD_FLHANDLE;
2386         }
2387
2388         rc = cl_req_prep(env, clerq);
2389         if (rc != 0) {
2390                 CERROR("cl_req_prep failed: %d\n", rc);
2391                 GOTO(out, req = ERR_PTR(rc));
2392         }
2393
2394         sort_brw_pages(pga, page_count);
2395         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2396                                   pga, &req, crattr.cra_capa, 1, 0);
2397         if (rc != 0) {
2398                 CERROR("prep_req failed: %d\n", rc);
2399                 GOTO(out, req = ERR_PTR(rc));
2400         }
2401
2402         if (cmd & OBD_BRW_MEMALLOC)
2403                 req->rq_memalloc = 1;
2404
2405         /* Need to update the timestamps after the request is built in case
2406          * we race with setattr (locally or in queue at OST).  If OST gets
2407          * later setattr before earlier BRW (as determined by the request xid),
2408          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2409          * way to do this in a single call.  bug 10150 */
2410         cl_req_attr_set(env, clerq, &crattr,
2411                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2412
2413         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2414
2415         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2416         aa = ptlrpc_req_async_args(req);
2417         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2418         cfs_list_splice(rpc_list, &aa->aa_oaps);
2419         CFS_INIT_LIST_HEAD(rpc_list);
2420         aa->aa_clerq = clerq;
2421 out:
2422         if (cmd & OBD_BRW_MEMALLOC)
2423                 cfs_memory_pressure_restore(mpflag);
2424
2425         capa_put(crattr.cra_capa);
2426         if (IS_ERR(req)) {
2427                 if (oa)
2428                         OBDO_FREE(oa);
2429                 if (pga)
2430                         OBD_FREE(pga, sizeof(*pga) * page_count);
2431                 /* this should happen rarely and is pretty bad, it makes the
2432                  * pending list not follow the dirty order */
2433                 client_obd_list_lock(&cli->cl_loi_list_lock);
2434                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2435                         cfs_list_del_init(&oap->oap_rpc_item);
2436
2437                         /* queued sync pages can be torn down while the pages
2438                          * were between the pending list and the rpc */
2439                         if (oap->oap_interrupted) {
2440                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2441                                 osc_ap_completion(env, cli, NULL, oap, 0,
2442                                                   oap->oap_count);
2443                                 continue;
2444                         }
2445                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2446                 }
2447                 if (clerq && !IS_ERR(clerq))
2448                         cl_req_completion(env, clerq, PTR_ERR(req));
2449         }
2450         RETURN(req);
2451 }
2452
2453 /**
2454  * prepare pages for ASYNC io and put pages in send queue.
2455  *
2456  * \param cmd OBD_BRW_* macroses
2457  * \param lop pending pages
2458  *
2459  * \return zero if no page added to send queue.
2460  * \return 1 if pages successfully added to send queue.
2461  * \return negative on errors.
2462  */
2463 static int
2464 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2465                  struct lov_oinfo *loi, int cmd,
2466                  struct loi_oap_pages *lop, pdl_policy_t pol)
2467 {
2468         struct ptlrpc_request *req;
2469         obd_count page_count = 0;
2470         struct osc_async_page *oap = NULL, *tmp;
2471         struct osc_brw_async_args *aa;
2472         const struct obd_async_page_ops *ops;
2473         CFS_LIST_HEAD(rpc_list);
2474         int srvlock = 0, mem_tight = 0;
2475         struct cl_object *clob = NULL;
2476         obd_off starting_offset = OBD_OBJECT_EOF;
2477         unsigned int ending_offset;
2478         int starting_page_off = 0;
2479         ENTRY;
2480
2481         /* ASYNC_HP pages first. At present, when the lock the pages is
2482          * to be canceled, the pages covered by the lock will be sent out
2483          * with ASYNC_HP. We have to send out them as soon as possible. */
2484         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2485                 if (oap->oap_async_flags & ASYNC_HP)
2486                         cfs_list_move(&oap->oap_pending_item, &rpc_list);
2487                 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2488                         /* only do this for writeback pages. */
2489                         cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2490                 if (++page_count >= cli->cl_max_pages_per_rpc)
2491                         break;
2492         }
2493         cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2494         page_count = 0;
2495
2496         /* first we find the pages we're allowed to work with */
2497         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2498                                      oap_pending_item) {
2499                 ops = oap->oap_caller_ops;
2500
2501                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2502                          "magic 0x%x\n", oap, oap->oap_magic);
2503
2504                 if (clob == NULL) {
2505                         /* pin object in memory, so that completion call-backs
2506                          * can be safely called under client_obd_list lock. */
2507                         clob = osc_oap2cl_page(oap)->cp_obj;
2508                         cl_object_get(clob);
2509                 }
2510
2511                 if (page_count != 0 &&
2512                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2513                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2514                                " oap %p, page %p, srvlock %u\n",
2515                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2516                         break;
2517                 }
2518
2519                 /* If there is a gap at the start of this page, it can't merge
2520                  * with any previous page, so we'll hand the network a
2521                  * "fragmented" page array that it can't transfer in 1 RDMA */
2522                 if (oap->oap_obj_off < starting_offset) {
2523                         if (starting_page_off != 0)
2524                                 break;
2525
2526                         starting_page_off = oap->oap_page_off;
2527                         starting_offset = oap->oap_obj_off + starting_page_off;
2528                 } else if (oap->oap_page_off != 0)
2529                         break;
2530
2531                 /* in llite being 'ready' equates to the page being locked
2532                  * until completion unlocks it.  commit_write submits a page
2533                  * as not ready because its unlock will happen unconditionally
2534                  * as the call returns.  if we race with commit_write giving
2535                  * us that page we don't want to create a hole in the page
2536                  * stream, so we stop and leave the rpc to be fired by
2537                  * another dirtier or kupdated interval (the not ready page
2538                  * will still be on the dirty list).  we could call in
2539                  * at the end of ll_file_write to process the queue again. */
2540                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2541                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2542                                                     cmd);
2543                         if (rc < 0)
2544                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2545                                                 "instead of ready\n", oap,
2546                                                 oap->oap_page, rc);
2547                         switch (rc) {
2548                         case -EAGAIN:
2549                                 /* llite is telling us that the page is still
2550                                  * in commit_write and that we should try
2551                                  * and put it in an rpc again later.  we
2552                                  * break out of the loop so we don't create
2553                                  * a hole in the sequence of pages in the rpc
2554                                  * stream.*/
2555                                 oap = NULL;
2556                                 break;
2557                         case -EINTR:
2558                                 /* the io isn't needed.. tell the checks
2559                                  * below to complete the rpc with EINTR */
2560                                 cfs_spin_lock(&oap->oap_lock);
2561                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2562                                 cfs_spin_unlock(&oap->oap_lock);
2563                                 oap->oap_count = -EINTR;
2564                                 break;
2565                         case 0:
2566                                 cfs_spin_lock(&oap->oap_lock);
2567                                 oap->oap_async_flags |= ASYNC_READY;
2568                                 cfs_spin_unlock(&oap->oap_lock);
2569                                 break;
2570                         default:
2571                                 LASSERTF(0, "oap %p page %p returned %d "
2572                                             "from make_ready\n", oap,
2573                                             oap->oap_page, rc);
2574                                 break;
2575                         }
2576                 }
2577                 if (oap == NULL)
2578                         break;
2579
2580                 /* take the page out of our book-keeping */
2581                 cfs_list_del_init(&oap->oap_pending_item);
2582                 lop_update_pending(cli, lop, cmd, -1);
2583                 cfs_list_del_init(&oap->oap_urgent_item);
2584
2585                 /* ask the caller for the size of the io as the rpc leaves. */
2586                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2587                         oap->oap_count =
2588                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2589                                                       cmd);
2590                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2591                 }
2592                 if (oap->oap_count <= 0) {
2593                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2594                                oap->oap_count);
2595                         osc_ap_completion(env, cli, NULL,
2596                                           oap, 0, oap->oap_count);
2597                         continue;
2598                 }
2599
2600                 /* now put the page back in our accounting */
2601                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2602                 if (page_count++ == 0)
2603                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2604
2605                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2606                         mem_tight = 1;
2607
2608                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2609                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2610                  * have the same alignment as the initial writes that allocated
2611                  * extents on the server. */
2612                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2613                                 oap->oap_count;
2614                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2615                         break;
2616
2617                 if (page_count >= cli->cl_max_pages_per_rpc)
2618                         break;
2619
2620                 /* If there is a gap at the end of this page, it can't merge
2621                  * with any subsequent pages, so we'll hand the network a
2622                  * "fragmented" page array that it can't transfer in 1 RDMA */
2623                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2624                         break;
2625         }
2626
2627         loi_list_maint(cli, loi);
2628
2629         client_obd_list_unlock(&cli->cl_loi_list_lock);
2630
2631         if (clob != NULL)
2632                 cl_object_put(env, clob);
2633
2634         if (page_count == 0) {
2635                 client_obd_list_lock(&cli->cl_loi_list_lock);
2636                 RETURN(0);
2637         }
2638
2639         req = osc_build_req(env, cli, &rpc_list, page_count,
2640                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2641         if (IS_ERR(req)) {
2642                 LASSERT(cfs_list_empty(&rpc_list));
2643                 loi_list_maint(cli, loi);
2644                 RETURN(PTR_ERR(req));
2645         }
2646
2647         aa = ptlrpc_req_async_args(req);
2648
2649         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2650         if (cmd == OBD_BRW_READ) {
2651                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2652                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2653                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2654                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2655         } else {
2656                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2657                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2658                                  cli->cl_w_in_flight);
2659                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2660                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2661         }
2662
2663         client_obd_list_lock(&cli->cl_loi_list_lock);
2664
2665         if (cmd == OBD_BRW_READ)
2666                 cli->cl_r_in_flight++;
2667         else
2668                 cli->cl_w_in_flight++;
2669
2670         /* queued sync pages can be torn down while the pages
2671          * were between the pending list and the rpc */
2672         tmp = NULL;
2673         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2674                 /* only one oap gets a request reference */
2675                 if (tmp == NULL)
2676                         tmp = oap;
2677                 if (oap->oap_interrupted && !req->rq_intr) {
2678                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2679                                oap, req);
2680                         ptlrpc_mark_interrupted(req);
2681                 }
2682         }
2683         if (tmp != NULL)
2684                 tmp->oap_request = ptlrpc_request_addref(req);
2685
2686         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2687                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2688
2689         req->rq_interpret_reply = brw_interpret;
2690
2691         /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2692          *      CPU/NUMA node the majority of pages were allocated on, and try
2693          *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2694          *      to reduce cross-CPU memory traffic.
2695          *
2696          *      But on the other hand, we expect that multiple ptlrpcd threads
2697          *      and the initial write sponsor can run in parallel, especially
2698          *      when data checksum is enabled, which is CPU-bound operation and
2699          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
2700          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2701          */
2702         ptlrpcd_add_req(req, pol, -1);
2703         RETURN(1);
2704 }
2705
2706 #define LOI_DEBUG(LOI, STR, args...)                                     \
2707         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2708                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2709                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2710                (LOI)->loi_write_lop.lop_num_pending,                     \
2711                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2712                (LOI)->loi_read_lop.lop_num_pending,                      \
2713                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2714                args)                                                     \
2715
2716 /* This is called by osc_check_rpcs() to find which objects have pages that
2717  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2718 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2719 {
2720         ENTRY;
2721
2722         /* First return objects that have blocked locks so that they
2723          * will be flushed quickly and other clients can get the lock,
2724          * then objects which have pages ready to be stuffed into RPCs */
2725         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2726                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2727                                       struct lov_oinfo, loi_hp_ready_item));
2728         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2729                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2730                                       struct lov_oinfo, loi_ready_item));
2731
2732         /* then if we have cache waiters, return all objects with queued
2733          * writes.  This is especially important when many small files
2734          * have filled up the cache and not been fired into rpcs because
2735          * they don't pass the nr_pending/object threshhold */
2736         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2737             !cfs_list_empty(&cli->cl_loi_write_list))
2738                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2739                                       struct lov_oinfo, loi_write_item));
2740
2741         /* then return all queued objects when we have an invalid import
2742          * so that they get flushed */
2743         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2744                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2745                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2746                                               struct lov_oinfo,
2747                                               loi_write_item));
2748                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2749                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2750                                               struct lov_oinfo, loi_read_item));
2751         }
2752         RETURN(NULL);
2753 }
2754
2755 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2756 {
2757         struct osc_async_page *oap;
2758         int hprpc = 0;
2759
2760         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2761                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2762                                      struct osc_async_page, oap_urgent_item);
2763                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2764         }
2765
2766         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2767                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2768                                      struct osc_async_page, oap_urgent_item);
2769                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2770         }
2771
2772         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2773 }
2774
2775 /* called with the loi list lock held */
2776 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2777 {
2778         struct lov_oinfo *loi;
2779         int rc = 0, race_counter = 0;
2780         pdl_policy_t pol;
2781         ENTRY;
2782
2783         pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2784
2785         while ((loi = osc_next_loi(cli)) != NULL) {
2786                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2787
2788                 if (osc_max_rpc_in_flight(cli, loi))
2789                         break;
2790
2791                 /* attempt some read/write balancing by alternating between
2792                  * reads and writes in an object.  The makes_rpc checks here
2793                  * would be redundant if we were getting read/write work items
2794                  * instead of objects.  we don't want send_oap_rpc to drain a
2795                  * partial read pending queue when we're given this object to
2796                  * do io on writes while there are cache waiters */
2797                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2798                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2799                                               &loi->loi_write_lop, pol);
2800                         if (rc < 0) {
2801                                 CERROR("Write request failed with %d\n", rc);
2802
2803                                 /* osc_send_oap_rpc failed, mostly because of
2804                                  * memory pressure.
2805                                  *
2806                                  * It can't break here, because if:
2807                                  *  - a page was submitted by osc_io_submit, so
2808                                  *    page locked;
2809                                  *  - no request in flight
2810                                  *  - no subsequent request
2811                                  * The system will be in live-lock state,
2812                                  * because there is no chance to call
2813                                  * osc_io_unplug() and osc_check_rpcs() any
2814                                  * more. pdflush can't help in this case,
2815                                  * because it might be blocked at grabbing
2816                                  * the page lock as we mentioned.
2817                                  *
2818                                  * Anyway, continue to drain pages. */
2819                                 /* break; */
2820                         }
2821
2822                         if (rc > 0)
2823                                 race_counter = 0;
2824                         else if (rc == 0)
2825                                 race_counter++;
2826                 }
2827                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2828                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2829                                               &loi->loi_read_lop, pol);
2830                         if (rc < 0)
2831                                 CERROR("Read request failed with %d\n", rc);
2832
2833                         if (rc > 0)
2834                                 race_counter = 0;
2835                         else if (rc == 0)
2836                                 race_counter++;
2837                 }
2838
2839                 /* attempt some inter-object balancing by issuing rpcs
2840                  * for each object in turn */
2841                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2842                         cfs_list_del_init(&loi->loi_hp_ready_item);
2843                 if (!cfs_list_empty(&loi->loi_ready_item))
2844                         cfs_list_del_init(&loi->loi_ready_item);
2845                 if (!cfs_list_empty(&loi->loi_write_item))
2846                         cfs_list_del_init(&loi->loi_write_item);
2847                 if (!cfs_list_empty(&loi->loi_read_item))
2848                         cfs_list_del_init(&loi->loi_read_item);
2849
2850                 loi_list_maint(cli, loi);
2851
2852                 /* send_oap_rpc fails with 0 when make_ready tells it to
2853                  * back off.  llite's make_ready does this when it tries
2854                  * to lock a page queued for write that is already locked.
2855                  * we want to try sending rpcs from many objects, but we
2856                  * don't want to spin failing with 0.  */
2857                 if (race_counter == 10)
2858                         break;
2859         }
2860 }
2861
2862 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2863 {
2864         osc_check_rpcs0(env, cli, 0);
2865 }
2866
2867 /**
2868  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2869  * is available.
2870  */
2871 int osc_enter_cache_try(const struct lu_env *env,
2872                         struct client_obd *cli, struct lov_oinfo *loi,
2873                         struct osc_async_page *oap, int transient)
2874 {
2875         int has_grant;
2876
2877         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2878         if (has_grant) {
2879                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2880                 if (transient) {
2881                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2882                         cfs_atomic_inc(&obd_dirty_transit_pages);
2883                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2884                 }
2885         }
2886         return has_grant;
2887 }
2888
2889 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2890  * grant or cache space. */
2891 static int osc_enter_cache(const struct lu_env *env,
2892                            struct client_obd *cli, struct lov_oinfo *loi,
2893                            struct osc_async_page *oap)
2894 {
2895         struct osc_cache_waiter ocw;
2896         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2897         int rc = -EDQUOT;
2898         ENTRY;
2899
2900         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2901                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2902                cli->cl_dirty_max, obd_max_dirty_pages,
2903                cli->cl_lost_grant, cli->cl_avail_grant);
2904
2905         /* force the caller to try sync io.  this can jump the list
2906          * of queued writes and create a discontiguous rpc stream */
2907         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2908             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2909             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2910                 RETURN(-EDQUOT);
2911
2912         /* Hopefully normal case - cache space and write credits available */
2913         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2914             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2915             osc_enter_cache_try(env, cli, loi, oap, 0))
2916                 RETURN(0);
2917
2918         /* We can get here for two reasons: too many dirty pages in cache, or
2919          * run out of grants. In both cases we should write dirty pages out.
2920          * Adding a cache waiter will trigger urgent write-out no matter what
2921          * RPC size will be.
2922          * The exiting condition is no avail grants and no dirty pages caching,
2923          * that really means there is no space on the OST. */
2924         cfs_waitq_init(&ocw.ocw_waitq);
2925         ocw.ocw_oap = oap;
2926         while (cli->cl_dirty > 0) {
2927                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2928                 ocw.ocw_rc = 0;
2929
2930                 loi_list_maint(cli, loi);
2931                 osc_check_rpcs(env, cli);
2932                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2933
2934                 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
2935                        cli->cl_import->imp_obd->obd_name, &ocw, oap);
2936
2937                 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
2938
2939                 client_obd_list_lock(&cli->cl_loi_list_lock);
2940                 cfs_list_del_init(&ocw.ocw_entry);
2941                 if (rc < 0)
2942                         break;
2943
2944                 rc = ocw.ocw_rc;
2945                 if (rc != -EDQUOT)
2946                         break;
2947         }
2948
2949         RETURN(rc);
2950 }
2951
2952
2953 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2954                         struct lov_oinfo *loi, cfs_page_t *page,
2955                         obd_off offset, const struct obd_async_page_ops *ops,
2956                         void *data, void **res, int nocache,
2957                         struct lustre_handle *lockh)
2958 {
2959         struct osc_async_page *oap;
2960
2961         ENTRY;
2962
2963         if (!page)
2964                 return cfs_size_round(sizeof(*oap));
2965
2966         oap = *res;
2967         oap->oap_magic = OAP_MAGIC;
2968         oap->oap_cli = &exp->exp_obd->u.cli;
2969         oap->oap_loi = loi;
2970
2971         oap->oap_caller_ops = ops;
2972         oap->oap_caller_data = data;
2973
2974         oap->oap_page = page;
2975         oap->oap_obj_off = offset;
2976         if (!client_is_remote(exp) &&
2977             cfs_capable(CFS_CAP_SYS_RESOURCE))
2978                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2979
2980         LASSERT(!(offset & ~CFS_PAGE_MASK));
2981
2982         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2983         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2984         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2985         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2986
2987         cfs_spin_lock_init(&oap->oap_lock);
2988         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2989         RETURN(0);
2990 }
2991
2992 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2993                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2994                        struct osc_async_page *oap, int cmd, int off,
2995                        int count, obd_flag brw_flags, enum async_flags async_flags)
2996 {
2997         struct client_obd *cli = &exp->exp_obd->u.cli;
2998         int rc = 0;
2999         ENTRY;
3000
3001         if (oap->oap_magic != OAP_MAGIC)
3002                 RETURN(-EINVAL);
3003
3004         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3005                 RETURN(-EIO);
3006
3007         if (!cfs_list_empty(&oap->oap_pending_item) ||
3008             !cfs_list_empty(&oap->oap_urgent_item) ||
3009             !cfs_list_empty(&oap->oap_rpc_item))
3010                 RETURN(-EBUSY);
3011
3012         /* check if the file's owner/group is over quota */
3013         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3014                 struct cl_object *obj;
3015                 struct cl_attr    attr; /* XXX put attr into thread info */
3016                 unsigned int qid[MAXQUOTAS];
3017
3018                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3019
3020                 cl_object_attr_lock(obj);
3021                 rc = cl_object_attr_get(env, obj, &attr);
3022                 cl_object_attr_unlock(obj);
3023
3024                 qid[USRQUOTA] = attr.cat_uid;
3025                 qid[GRPQUOTA] = attr.cat_gid;
3026                 if (rc == 0 &&
3027                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
3028                         rc = -EDQUOT;
3029                 if (rc)
3030                         RETURN(rc);
3031         }
3032
3033         if (loi == NULL)
3034                 loi = lsm->lsm_oinfo[0];
3035
3036         client_obd_list_lock(&cli->cl_loi_list_lock);
3037
3038         LASSERT(off + count <= CFS_PAGE_SIZE);
3039         oap->oap_cmd = cmd;
3040         oap->oap_page_off = off;
3041         oap->oap_count = count;
3042         oap->oap_brw_flags = brw_flags;
3043         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3044         if (cfs_memory_pressure_get())
3045                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3046         cfs_spin_lock(&oap->oap_lock);
3047         oap->oap_async_flags = async_flags;
3048         cfs_spin_unlock(&oap->oap_lock);
3049
3050         if (cmd & OBD_BRW_WRITE) {
3051                 rc = osc_enter_cache(env, cli, loi, oap);
3052                 if (rc) {
3053                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3054                         RETURN(rc);
3055                 }
3056         }
3057
3058         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3059                   cmd);
3060
3061         osc_oap_to_pending(oap);
3062         loi_list_maint(cli, loi);
3063         if (!osc_max_rpc_in_flight(cli, loi) &&
3064             lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3065                 LASSERT(cli->cl_writeback_work != NULL);
3066                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3067
3068                 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3069                        cli, rc);
3070         }
3071         client_obd_list_unlock(&cli->cl_loi_list_lock);
3072
3073         RETURN(0);
3074 }
3075
3076 /* aka (~was & now & flag), but this is more clear :) */
3077 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3078
3079 int osc_set_async_flags_base(struct client_obd *cli,
3080                              struct lov_oinfo *loi, struct osc_async_page *oap,
3081                              obd_flag async_flags)
3082 {
3083         struct loi_oap_pages *lop;
3084         int flags = 0;
3085         ENTRY;
3086
3087         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3088
3089         if (oap->oap_cmd & OBD_BRW_WRITE) {
3090                 lop = &loi->loi_write_lop;
3091         } else {
3092                 lop = &loi->loi_read_lop;
3093         }
3094
3095         if ((oap->oap_async_flags & async_flags) == async_flags)
3096                 RETURN(0);
3097
3098         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3099                 flags |= ASYNC_READY;
3100
3101         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3102             cfs_list_empty(&oap->oap_rpc_item)) {
3103                 if (oap->oap_async_flags & ASYNC_HP)
3104                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3105                 else
3106                         cfs_list_add_tail(&oap->oap_urgent_item,
3107                                           &lop->lop_urgent);
3108                 flags |= ASYNC_URGENT;
3109                 loi_list_maint(cli, loi);
3110         }
3111         cfs_spin_lock(&oap->oap_lock);
3112         oap->oap_async_flags |= flags;
3113         cfs_spin_unlock(&oap->oap_lock);
3114
3115         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3116                         oap->oap_async_flags);
3117         RETURN(0);
3118 }
3119
3120 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3121                             struct lov_oinfo *loi, struct osc_async_page *oap)
3122 {
3123         struct client_obd *cli = &exp->exp_obd->u.cli;
3124         struct loi_oap_pages *lop;
3125         int rc = 0;
3126         ENTRY;
3127
3128         if (oap->oap_magic != OAP_MAGIC)
3129                 RETURN(-EINVAL);
3130
3131         if (loi == NULL)
3132                 loi = lsm->lsm_oinfo[0];
3133
3134         if (oap->oap_cmd & OBD_BRW_WRITE) {
3135                 lop = &loi->loi_write_lop;
3136         } else {
3137                 lop = &loi->loi_read_lop;
3138         }
3139
3140         client_obd_list_lock(&cli->cl_loi_list_lock);
3141
3142         if (!cfs_list_empty(&oap->oap_rpc_item))
3143                 GOTO(out, rc = -EBUSY);
3144
3145         osc_exit_cache(cli, oap, 0);
3146         osc_wake_cache_waiters(cli);
3147
3148         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3149                 cfs_list_del_init(&oap->oap_urgent_item);
3150                 cfs_spin_lock(&oap->oap_lock);
3151                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3152                 cfs_spin_unlock(&oap->oap_lock);
3153         }
3154         if (!cfs_list_empty(&oap->oap_pending_item)) {
3155                 cfs_list_del_init(&oap->oap_pending_item);
3156                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3157         }
3158         loi_list_maint(cli, loi);
3159         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3160 out:
3161         client_obd_list_unlock(&cli->cl_loi_list_lock);
3162         RETURN(rc);
3163 }
3164
3165 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3166                                         struct ldlm_enqueue_info *einfo)
3167 {
3168         void *data = einfo->ei_cbdata;
3169         int set = 0;
3170
3171         LASSERT(lock != NULL);
3172         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3173         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3174         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3175         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3176
3177         lock_res_and_lock(lock);
3178         cfs_spin_lock(&osc_ast_guard);
3179
3180         if (lock->l_ast_data == NULL)
3181                 lock->l_ast_data = data;
3182         if (lock->l_ast_data == data)
3183                 set = 1;
3184
3185         cfs_spin_unlock(&osc_ast_guard);
3186         unlock_res_and_lock(lock);
3187
3188         return set;
3189 }
3190
3191 static int osc_set_data_with_check(struct lustre_handle *lockh,
3192                                    struct ldlm_enqueue_info *einfo)
3193 {
3194         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3195         int set = 0;
3196
3197         if (lock != NULL) {
3198                 set = osc_set_lock_data_with_check(lock, einfo);
3199                 LDLM_LOCK_PUT(lock);
3200         } else
3201                 CERROR("lockh %p, data %p - client evicted?\n",
3202                        lockh, einfo->ei_cbdata);
3203         return set;
3204 }
3205
3206 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3207                              ldlm_iterator_t replace, void *data)
3208 {
3209         struct ldlm_res_id res_id;
3210         struct obd_device *obd = class_exp2obd(exp);
3211
3212         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3213         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3214         return 0;
3215 }
3216
3217 /* find any ldlm lock of the inode in osc
3218  * return 0    not find
3219  *        1    find one
3220  *      < 0    error */
3221 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3222                            ldlm_iterator_t replace, void *data)
3223 {
3224         struct ldlm_res_id res_id;
3225         struct obd_device *obd = class_exp2obd(exp);
3226         int rc = 0;
3227
3228         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3229         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3230         if (rc == LDLM_ITER_STOP)
3231                 return(1);
3232         if (rc == LDLM_ITER_CONTINUE)
3233                 return(0);
3234         return(rc);
3235 }
3236
3237 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3238                             obd_enqueue_update_f upcall, void *cookie,
3239                             int *flags, int agl, int rc)
3240 {
3241         int intent = *flags & LDLM_FL_HAS_INTENT;
3242         ENTRY;
3243
3244         if (intent) {
3245                 /* The request was created before ldlm_cli_enqueue call. */
3246                 if (rc == ELDLM_LOCK_ABORTED) {
3247                         struct ldlm_reply *rep;
3248                         rep = req_capsule_server_get(&req->rq_pill,
3249                                                      &RMF_DLM_REP);
3250
3251                         LASSERT(rep != NULL);
3252                         if (rep->lock_policy_res1)
3253                                 rc = rep->lock_policy_res1;
3254                 }
3255         }
3256
3257         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3258             (rc == 0)) {
3259                 *flags |= LDLM_FL_LVB_READY;
3260                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3261                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3262         }
3263
3264         /* Call the update callback. */
3265         rc = (*upcall)(cookie, rc);
3266         RETURN(rc);
3267 }
3268
3269 static int osc_enqueue_interpret(const struct lu_env *env,
3270                                  struct ptlrpc_request *req,
3271                                  struct osc_enqueue_args *aa, int rc)
3272 {
3273         struct ldlm_lock *lock;
3274         struct lustre_handle handle;
3275         __u32 mode;
3276         struct ost_lvb *lvb;
3277         __u32 lvb_len;
3278         int *flags = aa->oa_flags;
3279
3280         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3281          * might be freed anytime after lock upcall has been called. */
3282         lustre_handle_copy(&handle, aa->oa_lockh);
3283         mode = aa->oa_ei->ei_mode;
3284
3285         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3286          * be valid. */
3287         lock = ldlm_handle2lock(&handle);
3288
3289         /* Take an additional reference so that a blocking AST that
3290          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3291          * to arrive after an upcall has been executed by
3292          * osc_enqueue_fini(). */
3293         ldlm_lock_addref(&handle, mode);
3294
3295         /* Let CP AST to grant the lock first. */
3296         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3297
3298         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3299                 lvb = NULL;
3300                 lvb_len = 0;
3301         } else {
3302                 lvb = aa->oa_lvb;
3303                 lvb_len = sizeof(*aa->oa_lvb);
3304         }
3305
3306         /* Complete obtaining the lock procedure. */
3307         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3308                                    mode, flags, lvb, lvb_len, &handle, rc);
3309         /* Complete osc stuff. */
3310         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3311                               flags, aa->oa_agl, rc);
3312
3313         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3314
3315         /* Release the lock for async request. */
3316         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3317                 /*
3318                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3319                  * not already released by
3320                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3321                  */
3322                 ldlm_lock_decref(&handle, mode);
3323
3324         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3325                  aa->oa_lockh, req, aa);
3326         ldlm_lock_decref(&handle, mode);
3327         LDLM_LOCK_PUT(lock);
3328         return rc;
3329 }
3330
3331 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3332                         struct lov_oinfo *loi, int flags,
3333                         struct ost_lvb *lvb, __u32 mode, int rc)
3334 {
3335         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3336
3337         if (rc == ELDLM_OK) {
3338                 __u64 tmp;
3339
3340                 LASSERT(lock != NULL);
3341                 loi->loi_lvb = *lvb;
3342                 tmp = loi->loi_lvb.lvb_size;
3343                 /* Extend KMS up to the end of this lock and no further
3344                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3345                 if (tmp > lock->l_policy_data.l_extent.end)
3346                         tmp = lock->l_policy_data.l_extent.end + 1;
3347                 if (tmp >= loi->loi_kms) {
3348                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3349                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3350                         loi_kms_set(loi, tmp);
3351                 } else {
3352                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3353                                    LPU64"; leaving kms="LPU64", end="LPU64,
3354                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3355                                    lock->l_policy_data.l_extent.end);
3356                 }
3357                 ldlm_lock_allow_match(lock);
3358         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3359                 LASSERT(lock != NULL);
3360                 loi->loi_lvb = *lvb;
3361                 ldlm_lock_allow_match(lock);
3362                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3363                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3364                 rc = ELDLM_OK;
3365         }
3366
3367         if (lock != NULL) {
3368                 if (rc != ELDLM_OK)
3369                         ldlm_lock_fail_match(lock);
3370
3371                 LDLM_LOCK_PUT(lock);
3372         }
3373 }
3374 EXPORT_SYMBOL(osc_update_enqueue);
3375
3376 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3377
3378 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3379  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3380  * other synchronous requests, however keeping some locks and trying to obtain
3381  * others may take a considerable amount of time in a case of ost failure; and
3382  * when other sync requests do not get released lock from a client, the client
3383  * is excluded from the cluster -- such scenarious make the life difficult, so
3384  * release locks just after they are obtained. */
3385 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3386                      int *flags, ldlm_policy_data_t *policy,
3387                      struct ost_lvb *lvb, int kms_valid,
3388                      obd_enqueue_update_f upcall, void *cookie,
3389                      struct ldlm_enqueue_info *einfo,
3390                      struct lustre_handle *lockh,
3391                      struct ptlrpc_request_set *rqset, int async, int agl)
3392 {
3393         struct obd_device *obd = exp->exp_obd;
3394         struct ptlrpc_request *req = NULL;
3395         int intent = *flags & LDLM_FL_HAS_INTENT;
3396         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3397         ldlm_mode_t mode;
3398         int rc;
3399         ENTRY;
3400
3401         /* Filesystem lock extents are extended to page boundaries so that
3402          * dealing with the page cache is a little smoother.  */
3403         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3404         policy->l_extent.end |= ~CFS_PAGE_MASK;
3405
3406         /*
3407          * kms is not valid when either object is completely fresh (so that no
3408          * locks are cached), or object was evicted. In the latter case cached
3409          * lock cannot be used, because it would prime inode state with
3410          * potentially stale LVB.
3411          */
3412         if (!kms_valid)
3413                 goto no_match;
3414
3415         /* Next, search for already existing extent locks that will cover us */
3416         /* If we're trying to read, we also search for an existing PW lock.  The
3417          * VFS and page cache already protect us locally, so lots of readers/
3418          * writers can share a single PW lock.
3419          *
3420          * There are problems with conversion deadlocks, so instead of
3421          * converting a read lock to a write lock, we'll just enqueue a new
3422          * one.
3423          *
3424          * At some point we should cancel the read lock instead of making them
3425          * send us a blocking callback, but there are problems with canceling
3426          * locks out from other users right now, too. */
3427         mode = einfo->ei_mode;
3428         if (einfo->ei_mode == LCK_PR)
3429                 mode |= LCK_PW;
3430         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3431                                einfo->ei_type, policy, mode, lockh, 0);
3432         if (mode) {
3433                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3434
3435                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3436                         /* For AGL, if enqueue RPC is sent but the lock is not
3437                          * granted, then skip to process this strpe.
3438                          * Return -ECANCELED to tell the caller. */
3439                         ldlm_lock_decref(lockh, mode);
3440                         LDLM_LOCK_PUT(matched);
3441                         RETURN(-ECANCELED);
3442                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3443                         *flags |= LDLM_FL_LVB_READY;
3444                         /* addref the lock only if not async requests and PW
3445                          * lock is matched whereas we asked for PR. */
3446                         if (!rqset && einfo->ei_mode != mode)
3447                                 ldlm_lock_addref(lockh, LCK_PR);
3448                         if (intent) {
3449                                 /* I would like to be able to ASSERT here that
3450                                  * rss <= kms, but I can't, for reasons which
3451                                  * are explained in lov_enqueue() */
3452                         }
3453
3454                         /* We already have a lock, and it's referenced */
3455                         (*upcall)(cookie, ELDLM_OK);
3456
3457                         if (einfo->ei_mode != mode)
3458                                 ldlm_lock_decref(lockh, LCK_PW);
3459                         else if (rqset)
3460                                 /* For async requests, decref the lock. */
3461                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3462                         LDLM_LOCK_PUT(matched);
3463                         RETURN(ELDLM_OK);
3464                 } else {
3465                         ldlm_lock_decref(lockh, mode);
3466                         LDLM_LOCK_PUT(matched);
3467                 }
3468         }
3469
3470  no_match:
3471         if (intent) {
3472                 CFS_LIST_HEAD(cancels);
3473                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3474                                            &RQF_LDLM_ENQUEUE_LVB);
3475                 if (req == NULL)
3476                         RETURN(-ENOMEM);
3477
3478                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3479                 if (rc) {
3480                         ptlrpc_request_free(req);
3481                         RETURN(rc);
3482                 }
3483
3484                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3485                                      sizeof *lvb);
3486                 ptlrpc_request_set_replen(req);
3487         }
3488
3489         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3490         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3491
3492         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3493                               sizeof(*lvb), lockh, async);
3494         if (rqset) {
3495                 if (!rc) {
3496                         struct osc_enqueue_args *aa;
3497                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3498                         aa = ptlrpc_req_async_args(req);
3499                         aa->oa_ei = einfo;
3500                         aa->oa_exp = exp;
3501                         aa->oa_flags  = flags;
3502                         aa->oa_upcall = upcall;
3503                         aa->oa_cookie = cookie;
3504                         aa->oa_lvb    = lvb;
3505                         aa->oa_lockh  = lockh;
3506                         aa->oa_agl    = !!agl;
3507
3508                         req->rq_interpret_reply =
3509                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3510                         if (rqset == PTLRPCD_SET)
3511                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3512                         else
3513                                 ptlrpc_set_add_req(rqset, req);
3514                 } else if (intent) {
3515                         ptlrpc_req_finished(req);
3516                 }
3517                 RETURN(rc);
3518         }
3519
3520         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3521         if (intent)
3522                 ptlrpc_req_finished(req);
3523
3524         RETURN(rc);
3525 }
3526
3527 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3528                        struct ldlm_enqueue_info *einfo,
3529                        struct ptlrpc_request_set *rqset)
3530 {
3531         struct ldlm_res_id res_id;
3532         int rc;
3533         ENTRY;
3534
3535         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3536                            oinfo->oi_md->lsm_object_seq, &res_id);
3537
3538         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3539                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3540                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3541                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3542                               rqset, rqset != NULL, 0);
3543         RETURN(rc);
3544 }
3545
3546 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3547                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3548                    int *flags, void *data, struct lustre_handle *lockh,
3549                    int unref)
3550 {
3551         struct obd_device *obd = exp->exp_obd;
3552         int lflags = *flags;
3553         ldlm_mode_t rc;
3554         ENTRY;
3555
3556         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3557                 RETURN(-EIO);
3558
3559         /* Filesystem lock extents are extended to page boundaries so that
3560          * dealing with the page cache is a little smoother */
3561         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3562         policy->l_extent.end |= ~CFS_PAGE_MASK;
3563
3564         /* Next, search for already existing extent locks that will cover us */
3565         /* If we're trying to read, we also search for an existing PW lock.  The
3566          * VFS and page cache already protect us locally, so lots of readers/
3567          * writers can share a single PW lock. */
3568         rc = mode;
3569         if (mode == LCK_PR)
3570                 rc |= LCK_PW;
3571         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3572                              res_id, type, policy, rc, lockh, unref);
3573         if (rc) {
3574                 if (data != NULL) {
3575                         if (!osc_set_data_with_check(lockh, data)) {
3576                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3577                                         ldlm_lock_decref(lockh, rc);
3578                                 RETURN(0);
3579                         }
3580                 }
3581                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3582                         ldlm_lock_addref(lockh, LCK_PR);
3583                         ldlm_lock_decref(lockh, LCK_PW);
3584                 }
3585                 RETURN(rc);
3586         }
3587         RETURN(rc);
3588 }
3589
3590 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3591 {
3592         ENTRY;
3593
3594         if (unlikely(mode == LCK_GROUP))
3595                 ldlm_lock_decref_and_cancel(lockh, mode);
3596         else
3597                 ldlm_lock_decref(lockh, mode);
3598
3599         RETURN(0);
3600 }
3601
3602 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3603                       __u32 mode, struct lustre_handle *lockh)
3604 {
3605         ENTRY;
3606         RETURN(osc_cancel_base(lockh, mode));
3607 }
3608
3609 static int osc_cancel_unused(struct obd_export *exp,
3610                              struct lov_stripe_md *lsm,
3611                              ldlm_cancel_flags_t flags,
3612                              void *opaque)
3613 {
3614         struct obd_device *obd = class_exp2obd(exp);
3615         struct ldlm_res_id res_id, *resp = NULL;
3616
3617         if (lsm != NULL) {
3618                 resp = osc_build_res_name(lsm->lsm_object_id,
3619                                           lsm->lsm_object_seq, &res_id);
3620         }
3621
3622         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3623 }
3624
3625 static int osc_statfs_interpret(const struct lu_env *env,
3626                                 struct ptlrpc_request *req,
3627                                 struct osc_async_args *aa, int rc)
3628 {
3629         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3630         struct obd_statfs *msfs;
3631         __u64 used;
3632         ENTRY;
3633
3634         if (rc == -EBADR)
3635                 /* The request has in fact never been sent
3636                  * due to issues at a higher level (LOV).
3637                  * Exit immediately since the caller is
3638                  * aware of the problem and takes care
3639                  * of the clean up */
3640                  RETURN(rc);
3641
3642         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3643             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3644                 GOTO(out, rc = 0);
3645
3646         if (rc != 0)
3647                 GOTO(out, rc);
3648
3649         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3650         if (msfs == NULL) {
3651                 GOTO(out, rc = -EPROTO);
3652         }
3653
3654         /* Reinitialize the RDONLY and DEGRADED flags at the client
3655          * on each statfs, so they don't stay set permanently. */
3656         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3657
3658         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3659                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3660         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3661                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3662
3663         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3664                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3665         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3666                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3667
3668         /* Add a bit of hysteresis so this flag isn't continually flapping,
3669          * and ensure that new files don't get extremely fragmented due to
3670          * only a small amount of available space in the filesystem.
3671          * We want to set the NOSPC flag when there is less than ~0.1% free
3672          * and clear it when there is at least ~0.2% free space, so:
3673          *                   avail < ~0.1% max          max = avail + used
3674          *            1025 * avail < avail + used       used = blocks - free
3675          *            1024 * avail < used
3676          *            1024 * avail < blocks - free
3677          *                   avail < ((blocks - free) >> 10)
3678          *
3679          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3680          * lose that amount of space so in those cases we report no space left
3681          * if their is less than 1 GB left.                             */
3682         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3683         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3684                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3685                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3686         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3687                           (msfs->os_ffree > 64) &&
3688                           (msfs->os_bavail > (used << 1)))) {
3689                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3690                                              OSCC_FLAG_NOSPC_BLK);
3691         }
3692
3693         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3694                      (msfs->os_bavail < used)))
3695                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3696
3697         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3698
3699         *aa->aa_oi->oi_osfs = *msfs;
3700 out:
3701         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3702         RETURN(rc);
3703 }
3704
3705 static int osc_statfs_async(struct obd_export *exp,
3706                             struct obd_info *oinfo, __u64 max_age,
3707                             struct ptlrpc_request_set *rqset)
3708 {
3709         struct obd_device     *obd = class_exp2obd(exp);
3710         struct ptlrpc_request *req;
3711         struct osc_async_args *aa;
3712         int                    rc;
3713         ENTRY;
3714
3715         /* We could possibly pass max_age in the request (as an absolute
3716          * timestamp or a "seconds.usec ago") so the target can avoid doing
3717          * extra calls into the filesystem if that isn't necessary (e.g.
3718          * during mount that would help a bit).  Having relative timestamps
3719          * is not so great if request processing is slow, while absolute
3720          * timestamps are not ideal because they need time synchronization. */
3721         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3722         if (req == NULL)
3723                 RETURN(-ENOMEM);
3724
3725         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3726         if (rc) {
3727                 ptlrpc_request_free(req);
3728                 RETURN(rc);
3729         }
3730         ptlrpc_request_set_replen(req);
3731         req->rq_request_portal = OST_CREATE_PORTAL;
3732         ptlrpc_at_set_req_timeout(req);
3733
3734         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3735                 /* procfs requests not want stat in wait for avoid deadlock */
3736                 req->rq_no_resend = 1;
3737                 req->rq_no_delay = 1;
3738         }
3739
3740         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3741         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3742         aa = ptlrpc_req_async_args(req);
3743         aa->aa_oi = oinfo;
3744
3745         ptlrpc_set_add_req(rqset, req);
3746         RETURN(0);
3747 }
3748
3749 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3750                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
3751 {
3752         struct obd_device     *obd = class_exp2obd(exp);
3753         struct obd_statfs     *msfs;
3754         struct ptlrpc_request *req;
3755         struct obd_import     *imp = NULL;
3756         int rc;
3757         ENTRY;
3758
3759         /*Since the request might also come from lprocfs, so we need
3760          *sync this with client_disconnect_export Bug15684*/
3761         cfs_down_read(&obd->u.cli.cl_sem);
3762         if (obd->u.cli.cl_import)
3763                 imp = class_import_get(obd->u.cli.cl_import);
3764         cfs_up_read(&obd->u.cli.cl_sem);
3765         if (!imp)
3766                 RETURN(-ENODEV);
3767
3768         /* We could possibly pass max_age in the request (as an absolute
3769          * timestamp or a "seconds.usec ago") so the target can avoid doing
3770          * extra calls into the filesystem if that isn't necessary (e.g.
3771          * during mount that would help a bit).  Having relative timestamps
3772          * is not so great if request processing is slow, while absolute
3773          * timestamps are not ideal because they need time synchronization. */
3774         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3775
3776         class_import_put(imp);
3777
3778         if (req == NULL)
3779                 RETURN(-ENOMEM);
3780
3781         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3782         if (rc) {
3783                 ptlrpc_request_free(req);
3784                 RETURN(rc);
3785         }
3786         ptlrpc_request_set_replen(req);
3787         req->rq_request_portal = OST_CREATE_PORTAL;
3788         ptlrpc_at_set_req_timeout(req);
3789
3790         if (flags & OBD_STATFS_NODELAY) {
3791                 /* procfs requests not want stat in wait for avoid deadlock */
3792                 req->rq_no_resend = 1;
3793                 req->rq_no_delay = 1;
3794         }
3795
3796         rc = ptlrpc_queue_wait(req);
3797         if (rc)
3798                 GOTO(out, rc);
3799
3800         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3801         if (msfs == NULL) {
3802                 GOTO(out, rc = -EPROTO);
3803         }
3804
3805         *osfs = *msfs;
3806
3807         EXIT;
3808  out:
3809         ptlrpc_req_finished(req);
3810         return rc;
3811 }
3812
3813 /* Retrieve object striping information.
3814  *
3815  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3816  * the maximum number of OST indices which will fit in the user buffer.
3817  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3818  */
3819 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3820 {
3821         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3822         struct lov_user_md_v3 lum, *lumk;
3823         struct lov_user_ost_data_v1 *lmm_objects;
3824         int rc = 0, lum_size;
3825         ENTRY;
3826
3827         if (!lsm)
3828                 RETURN(-ENODATA);
3829
3830         /* we only need the header part from user space to get lmm_magic and
3831          * lmm_stripe_count, (the header part is common to v1 and v3) */
3832         lum_size = sizeof(struct lov_user_md_v1);
3833         if (cfs_copy_from_user(&lum, lump, lum_size))
3834                 RETURN(-EFAULT);
3835
3836         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3837             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3838                 RETURN(-EINVAL);
3839
3840         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3841         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3842         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3843         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3844
3845         /* we can use lov_mds_md_size() to compute lum_size
3846          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3847         if (lum.lmm_stripe_count > 0) {
3848                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3849                 OBD_ALLOC(lumk, lum_size);
3850                 if (!lumk)
3851                         RETURN(-ENOMEM);
3852
3853                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3854                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3855                 else
3856                         lmm_objects = &(lumk->lmm_objects[0]);
3857                 lmm_objects->l_object_id = lsm->lsm_object_id;
3858         } else {
3859                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3860                 lumk = &lum;
3861         }
3862
3863         lumk->lmm_object_id = lsm->lsm_object_id;
3864         lumk->lmm_object_seq = lsm->lsm_object_seq;
3865         lumk->lmm_stripe_count = 1;
3866
3867         if (cfs_copy_to_user(lump, lumk, lum_size))
3868                 rc = -EFAULT;
3869
3870         if (lumk != &lum)
3871                 OBD_FREE(lumk, lum_size);
3872
3873         RETURN(rc);
3874 }
3875
3876
3877 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3878                          void *karg, void *uarg)
3879 {
3880         struct obd_device *obd = exp->exp_obd;
3881         struct obd_ioctl_data *data = karg;
3882         int err = 0;
3883         ENTRY;
3884
3885         if (!cfs_try_module_get(THIS_MODULE)) {
3886                 CERROR("Can't get module. Is it alive?");
3887                 return -EINVAL;
3888         }
3889         switch (cmd) {
3890         case OBD_IOC_LOV_GET_CONFIG: {
3891                 char *buf;
3892                 struct lov_desc *desc;
3893                 struct obd_uuid uuid;
3894
3895                 buf = NULL;
3896                 len = 0;
3897                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3898                         GOTO(out, err = -EINVAL);
3899
3900                 data = (struct obd_ioctl_data *)buf;
3901
3902                 if (sizeof(*desc) > data->ioc_inllen1) {
3903                         obd_ioctl_freedata(buf, len);
3904                         GOTO(out, err = -EINVAL);
3905                 }
3906
3907                 if (data->ioc_inllen2 < sizeof(uuid)) {
3908                         obd_ioctl_freedata(buf, len);
3909                         GOTO(out, err = -EINVAL);
3910                 }
3911
3912                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3913                 desc->ld_tgt_count = 1;
3914                 desc->ld_active_tgt_count = 1;
3915                 desc->ld_default_stripe_count = 1;
3916                 desc->ld_default_stripe_size = 0;
3917                 desc->ld_default_stripe_offset = 0;
3918                 desc->ld_pattern = 0;
3919                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3920
3921                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3922
3923                 err = cfs_copy_to_user((void *)uarg, buf, len);
3924                 if (err)
3925                         err = -EFAULT;
3926                 obd_ioctl_freedata(buf, len);
3927                 GOTO(out, err);
3928         }
3929         case LL_IOC_LOV_SETSTRIPE:
3930                 err = obd_alloc_memmd(exp, karg);
3931                 if (err > 0)
3932                         err = 0;
3933                 GOTO(out, err);
3934         case LL_IOC_LOV_GETSTRIPE:
3935                 err = osc_getstripe(karg, uarg);
3936                 GOTO(out, err);
3937         case OBD_IOC_CLIENT_RECOVER:
3938                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3939                                             data->ioc_inlbuf1, 0);
3940                 if (err > 0)
3941                         err = 0;
3942                 GOTO(out, err);
3943         case IOC_OSC_SET_ACTIVE:
3944                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3945                                                data->ioc_offset);
3946                 GOTO(out, err);
3947         case OBD_IOC_POLL_QUOTACHECK:
3948                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3949                 GOTO(out, err);
3950         case OBD_IOC_PING_TARGET:
3951                 err = ptlrpc_obd_ping(obd);
3952                 GOTO(out, err);
3953         default:
3954                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3955                        cmd, cfs_curproc_comm());
3956                 GOTO(out, err = -ENOTTY);
3957         }
3958 out:
3959         cfs_module_put(THIS_MODULE);
3960         return err;
3961 }
3962
3963 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3964                         obd_count keylen, void *key, __u32 *vallen, void *val,
3965                         struct lov_stripe_md *lsm)
3966 {
3967         ENTRY;
3968         if (!vallen || !val)
3969                 RETURN(-EFAULT);
3970
3971         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3972                 __u32 *stripe = val;
3973                 *vallen = sizeof(*stripe);
3974                 *stripe = 0;
3975                 RETURN(0);
3976         } else if (KEY_IS(KEY_LAST_ID)) {
3977                 struct ptlrpc_request *req;
3978                 obd_id                *reply;
3979                 char                  *tmp;
3980                 int                    rc;
3981
3982                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3983                                            &RQF_OST_GET_INFO_LAST_ID);
3984                 if (req == NULL)
3985                         RETURN(-ENOMEM);
3986
3987                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3988                                      RCL_CLIENT, keylen);
3989                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3990                 if (rc) {
3991                         ptlrpc_request_free(req);
3992                         RETURN(rc);
3993                 }
3994
3995                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3996                 memcpy(tmp, key, keylen);
3997
3998                 req->rq_no_delay = req->rq_no_resend = 1;
3999                 ptlrpc_request_set_replen(req);
4000                 rc = ptlrpc_queue_wait(req);
4001                 if (rc)
4002                         GOTO(out, rc);
4003
4004                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
4005                 if (reply == NULL)
4006                         GOTO(out, rc = -EPROTO);
4007
4008                 *((obd_id *)val) = *reply;
4009         out:
4010                 ptlrpc_req_finished(req);
4011                 RETURN(rc);
4012         } else if (KEY_IS(KEY_FIEMAP)) {
4013                 struct ptlrpc_request *req;
4014                 struct ll_user_fiemap *reply;
4015                 char *tmp;
4016                 int rc;
4017
4018                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4019                                            &RQF_OST_GET_INFO_FIEMAP);
4020                 if (req == NULL)
4021                         RETURN(-ENOMEM);
4022
4023                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4024                                      RCL_CLIENT, keylen);
4025                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4026                                      RCL_CLIENT, *vallen);
4027                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4028                                      RCL_SERVER, *vallen);
4029
4030                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4031                 if (rc) {
4032                         ptlrpc_request_free(req);
4033                         RETURN(rc);
4034                 }
4035
4036                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4037                 memcpy(tmp, key, keylen);
4038                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4039                 memcpy(tmp, val, *vallen);
4040
4041                 ptlrpc_request_set_replen(req);
4042                 rc = ptlrpc_queue_wait(req);
4043                 if (rc)
4044                         GOTO(out1, rc);
4045
4046                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4047                 if (reply == NULL)
4048                         GOTO(out1, rc = -EPROTO);
4049
4050                 memcpy(val, reply, *vallen);
4051         out1:
4052                 ptlrpc_req_finished(req);
4053
4054                 RETURN(rc);
4055         }
4056
4057         RETURN(-EINVAL);
4058 }
4059
4060 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4061 {
4062         struct llog_ctxt *ctxt;
4063         int rc = 0;
4064         ENTRY;
4065
4066         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4067         if (ctxt) {
4068                 rc = llog_initiator_connect(ctxt);
4069                 llog_ctxt_put(ctxt);
4070         } else {
4071                 /* XXX return an error? skip setting below flags? */
4072         }
4073
4074         cfs_spin_lock(&imp->imp_lock);
4075         imp->imp_server_timeout = 1;
4076         imp->imp_pingable = 1;
4077         cfs_spin_unlock(&imp->imp_lock);
4078         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4079
4080         RETURN(rc);
4081 }
4082
4083 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4084                                           struct ptlrpc_request *req,
4085                                           void *aa, int rc)
4086 {
4087         ENTRY;
4088         if (rc != 0)
4089                 RETURN(rc);
4090
4091         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4092 }
4093
4094 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
4095                               obd_count keylen, void *key, obd_count vallen,
4096                               void *val, struct ptlrpc_request_set *set)
4097 {
4098         struct ptlrpc_request *req;
4099         struct obd_device     *obd = exp->exp_obd;
4100         struct obd_import     *imp = class_exp2cliimp(exp);
4101         char                  *tmp;
4102         int                    rc;
4103         ENTRY;
4104
4105         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4106
4107         if (KEY_IS(KEY_NEXT_ID)) {
4108                 obd_id new_val;
4109                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4110
4111                 if (vallen != sizeof(obd_id))
4112                         RETURN(-ERANGE);
4113                 if (val == NULL)
4114                         RETURN(-EINVAL);
4115
4116                 if (vallen != sizeof(obd_id))
4117                         RETURN(-EINVAL);
4118
4119                 /* avoid race between allocate new object and set next id
4120                  * from ll_sync thread */
4121                 cfs_spin_lock(&oscc->oscc_lock);
4122                 new_val = *((obd_id*)val) + 1;
4123                 if (new_val > oscc->oscc_next_id)
4124                         oscc->oscc_next_id = new_val;
4125                 cfs_spin_unlock(&oscc->oscc_lock);
4126                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4127                        exp->exp_obd->obd_name,
4128                        obd->u.cli.cl_oscc.oscc_next_id);
4129
4130                 RETURN(0);
4131         }
4132
4133         if (KEY_IS(KEY_CHECKSUM)) {
4134                 if (vallen != sizeof(int))
4135                         RETURN(-EINVAL);
4136                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4137                 RETURN(0);
4138         }
4139
4140         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4141                 sptlrpc_conf_client_adapt(obd);
4142                 RETURN(0);
4143         }
4144
4145         if (KEY_IS(KEY_FLUSH_CTX)) {
4146                 sptlrpc_import_flush_my_ctx(imp);
4147                 RETURN(0);
4148         }
4149
4150         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4151                 RETURN(-EINVAL);
4152
4153         /* We pass all other commands directly to OST. Since nobody calls osc
4154            methods directly and everybody is supposed to go through LOV, we
4155            assume lov checked invalid values for us.
4156            The only recognised values so far are evict_by_nid and mds_conn.
4157            Even if something bad goes through, we'd get a -EINVAL from OST
4158            anyway. */
4159
4160         if (KEY_IS(KEY_GRANT_SHRINK))
4161                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4162         else
4163                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4164
4165         if (req == NULL)
4166                 RETURN(-ENOMEM);
4167
4168         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4169                              RCL_CLIENT, keylen);
4170         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4171                              RCL_CLIENT, vallen);
4172         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4173         if (rc) {
4174                 ptlrpc_request_free(req);
4175                 RETURN(rc);
4176         }
4177
4178         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4179         memcpy(tmp, key, keylen);
4180         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4181         memcpy(tmp, val, vallen);
4182
4183         if (KEY_IS(KEY_MDS_CONN)) {
4184                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4185
4186                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4187                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4188                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4189                 req->rq_no_delay = req->rq_no_resend = 1;
4190                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4191         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4192                 struct osc_grant_args *aa;
4193                 struct obdo *oa;
4194
4195                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4196                 aa = ptlrpc_req_async_args(req);
4197                 OBDO_ALLOC(oa);
4198                 if (!oa) {
4199                         ptlrpc_req_finished(req);
4200                         RETURN(-ENOMEM);
4201                 }
4202                 *oa = ((struct ost_body *)val)->oa;
4203                 aa->aa_oa = oa;
4204                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4205         }
4206
4207         ptlrpc_request_set_replen(req);
4208         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4209                 LASSERT(set != NULL);
4210                 ptlrpc_set_add_req(set, req);
4211                 ptlrpc_check_set(NULL, set);
4212         } else
4213                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4214
4215         RETURN(0);
4216 }
4217
4218
4219 static struct llog_operations osc_size_repl_logops = {
4220         lop_cancel: llog_obd_repl_cancel
4221 };
4222
4223 static struct llog_operations osc_mds_ost_orig_logops;
4224
4225 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4226                            struct obd_device *tgt, struct llog_catid *catid)
4227 {
4228         int rc;
4229         ENTRY;
4230
4231         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4232                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4233         if (rc) {
4234                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4235                 GOTO(out, rc);
4236         }
4237
4238         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4239                         NULL, &osc_size_repl_logops);
4240         if (rc) {
4241                 struct llog_ctxt *ctxt =
4242                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4243                 if (ctxt)
4244                         llog_cleanup(ctxt);
4245                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4246         }
4247         GOTO(out, rc);
4248 out:
4249         if (rc) {
4250                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4251                        obd->obd_name, tgt->obd_name, catid, rc);
4252                 CERROR("logid "LPX64":0x%x\n",
4253                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4254         }
4255         return rc;
4256 }
4257
4258 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4259                          struct obd_device *disk_obd, int *index)
4260 {
4261         struct llog_catid catid;
4262         static char name[32] = CATLIST;
4263         int rc;
4264         ENTRY;
4265
4266         LASSERT(olg == &obd->obd_olg);
4267
4268         cfs_mutex_lock(&olg->olg_cat_processing);
4269         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4270         if (rc) {
4271                 CERROR("rc: %d\n", rc);
4272                 GOTO(out, rc);
4273         }
4274
4275         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4276                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4277                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4278
4279         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4280         if (rc) {
4281                 CERROR("rc: %d\n", rc);
4282                 GOTO(out, rc);
4283         }
4284
4285         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4286         if (rc) {
4287                 CERROR("rc: %d\n", rc);
4288                 GOTO(out, rc);
4289         }
4290
4291  out:
4292         cfs_mutex_unlock(&olg->olg_cat_processing);
4293
4294         return rc;
4295 }
4296
4297 static int osc_llog_finish(struct obd_device *obd, int count)
4298 {
4299         struct llog_ctxt *ctxt;
4300         int rc = 0, rc2 = 0;
4301         ENTRY;
4302
4303         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4304         if (ctxt)
4305                 rc = llog_cleanup(ctxt);
4306
4307         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4308         if (ctxt)
4309                 rc2 = llog_cleanup(ctxt);
4310         if (!rc)
4311                 rc = rc2;
4312
4313         RETURN(rc);
4314 }
4315
4316 static int osc_reconnect(const struct lu_env *env,
4317                          struct obd_export *exp, struct obd_device *obd,
4318                          struct obd_uuid *cluuid,
4319                          struct obd_connect_data *data,
4320                          void *localdata)
4321 {
4322         struct client_obd *cli = &obd->u.cli;
4323
4324         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4325                 long lost_grant;
4326
4327                 client_obd_list_lock(&cli->cl_loi_list_lock);
4328                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4329                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4330                 lost_grant = cli->cl_lost_grant;
4331                 cli->cl_lost_grant = 0;
4332                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4333
4334                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4335                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4336                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4337                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4338                        " ocd_grant: %d\n", data->ocd_connect_flags,
4339                        data->ocd_version, data->ocd_grant);
4340         }
4341
4342         RETURN(0);
4343 }
4344
4345 static int osc_disconnect(struct obd_export *exp)
4346 {
4347         struct obd_device *obd = class_exp2obd(exp);
4348         struct llog_ctxt  *ctxt;
4349         int rc;
4350
4351         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4352         if (ctxt) {
4353                 if (obd->u.cli.cl_conn_count == 1) {
4354                         /* Flush any remaining cancel messages out to the
4355                          * target */
4356                         llog_sync(ctxt, exp);
4357                 }
4358                 llog_ctxt_put(ctxt);
4359         } else {
4360                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4361                        obd);
4362         }
4363
4364         rc = client_disconnect_export(exp);
4365         /**
4366          * Initially we put del_shrink_grant before disconnect_export, but it
4367          * causes the following problem if setup (connect) and cleanup
4368          * (disconnect) are tangled together.
4369          *      connect p1                     disconnect p2
4370          *   ptlrpc_connect_import
4371          *     ...............               class_manual_cleanup
4372          *                                     osc_disconnect
4373          *                                     del_shrink_grant
4374          *   ptlrpc_connect_interrupt
4375          *     init_grant_shrink
4376          *   add this client to shrink list
4377          *                                      cleanup_osc
4378          * Bang! pinger trigger the shrink.
4379          * So the osc should be disconnected from the shrink list, after we
4380          * are sure the import has been destroyed. BUG18662
4381          */
4382         if (obd->u.cli.cl_import == NULL)
4383                 osc_del_shrink_grant(&obd->u.cli);
4384         return rc;
4385 }
4386
4387 static int osc_import_event(struct obd_device *obd,
4388                             struct obd_import *imp,
4389                             enum obd_import_event event)
4390 {
4391         struct client_obd *cli;
4392         int rc = 0;
4393
4394         ENTRY;
4395         LASSERT(imp->imp_obd == obd);
4396
4397         switch (event) {
4398         case IMP_EVENT_DISCON: {
4399                 /* Only do this on the MDS OSC's */
4400                 if (imp->imp_server_timeout) {
4401                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4402
4403                         cfs_spin_lock(&oscc->oscc_lock);
4404                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4405                         cfs_spin_unlock(&oscc->oscc_lock);
4406                 }
4407                 cli = &obd->u.cli;
4408                 client_obd_list_lock(&cli->cl_loi_list_lock);
4409                 cli->cl_avail_grant = 0;
4410                 cli->cl_lost_grant = 0;
4411                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4412                 break;
4413         }
4414         case IMP_EVENT_INACTIVE: {
4415                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4416                 break;
4417         }
4418         case IMP_EVENT_INVALIDATE: {
4419                 struct ldlm_namespace *ns = obd->obd_namespace;
4420                 struct lu_env         *env;
4421                 int                    refcheck;
4422
4423                 env = cl_env_get(&refcheck);
4424                 if (!IS_ERR(env)) {
4425                         /* Reset grants */
4426                         cli = &obd->u.cli;
4427                         client_obd_list_lock(&cli->cl_loi_list_lock);
4428                         /* all pages go to failing rpcs due to the invalid
4429                          * import */
4430                         osc_check_rpcs(env, cli);
4431                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4432
4433                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4434                         cl_env_put(env, &refcheck);
4435                 } else
4436                         rc = PTR_ERR(env);
4437                 break;
4438         }
4439         case IMP_EVENT_ACTIVE: {
4440                 /* Only do this on the MDS OSC's */
4441                 if (imp->imp_server_timeout) {
4442                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4443
4444                         cfs_spin_lock(&oscc->oscc_lock);
4445                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4446                                               OSCC_FLAG_NOSPC_BLK);
4447                         cfs_spin_unlock(&oscc->oscc_lock);
4448                 }
4449                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4450                 break;
4451         }
4452         case IMP_EVENT_OCD: {
4453                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4454
4455                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4456                         osc_init_grant(&obd->u.cli, ocd);
4457
4458                 /* See bug 7198 */
4459                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4460                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4461
4462                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4463                 break;
4464         }
4465         case IMP_EVENT_DEACTIVATE: {
4466                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4467                 break;
4468         }
4469         case IMP_EVENT_ACTIVATE: {
4470                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4471                 break;
4472         }
4473         default:
4474                 CERROR("Unknown import event %d\n", event);
4475                 LBUG();
4476         }
4477         RETURN(rc);
4478 }
4479
4480 /**
4481  * Determine whether the lock can be canceled before replaying the lock
4482  * during recovery, see bug16774 for detailed information.
4483  *
4484  * \retval zero the lock can't be canceled
4485  * \retval other ok to cancel
4486  */
4487 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4488 {
4489         check_res_locked(lock->l_resource);
4490
4491         /*
4492          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4493          *
4494          * XXX as a future improvement, we can also cancel unused write lock
4495          * if it doesn't have dirty data and active mmaps.
4496          */
4497         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4498             (lock->l_granted_mode == LCK_PR ||
4499              lock->l_granted_mode == LCK_CR) &&
4500             (osc_dlm_lock_pageref(lock) == 0))
4501                 RETURN(1);
4502
4503         RETURN(0);
4504 }
4505
4506 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4507 {
4508         struct client_obd *cli = &obd->u.cli;
4509         int rc;
4510         ENTRY;
4511
4512         ENTRY;
4513         rc = ptlrpcd_addref();
4514         if (rc)
4515                 RETURN(rc);
4516
4517         rc = client_obd_setup(obd, lcfg);
4518         if (rc == 0) {
4519                 void *handler;
4520                 handler = ptlrpcd_alloc_work(cli->cl_import,
4521                                              brw_queue_work, cli);
4522                 if (!IS_ERR(handler))
4523                         cli->cl_writeback_work = handler;
4524                 else
4525                         rc = PTR_ERR(handler);
4526         }
4527
4528         if (rc == 0) {
4529                 struct lprocfs_static_vars lvars = { 0 };
4530
4531                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4532                 lprocfs_osc_init_vars(&lvars);
4533                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4534                         lproc_osc_attach_seqstat(obd);
4535                         sptlrpc_lprocfs_cliobd_attach(obd);
4536                         ptlrpc_lprocfs_register_obd(obd);
4537                 }
4538
4539                 oscc_init(obd);
4540                 /* We need to allocate a few requests more, because
4541                    brw_interpret tries to create new requests before freeing
4542                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4543                    reserved, but I afraid that might be too much wasted RAM
4544                    in fact, so 2 is just my guess and still should work. */
4545                 cli->cl_import->imp_rq_pool =
4546                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4547                                             OST_MAXREQSIZE,
4548                                             ptlrpc_add_rqs_to_pool);
4549
4550                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4551
4552                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4553         }
4554
4555         if (rc)
4556                 ptlrpcd_decref();
4557         RETURN(rc);
4558 }
4559
4560 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4561 {
4562         int rc = 0;
4563         ENTRY;
4564
4565         switch (stage) {
4566         case OBD_CLEANUP_EARLY: {
4567                 struct obd_import *imp;
4568                 imp = obd->u.cli.cl_import;
4569                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4570                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4571                 ptlrpc_deactivate_import(imp);
4572                 cfs_spin_lock(&imp->imp_lock);
4573                 imp->imp_pingable = 0;
4574                 cfs_spin_unlock(&imp->imp_lock);
4575                 break;
4576         }
4577         case OBD_CLEANUP_EXPORTS: {
4578                 struct client_obd *cli = &obd->u.cli;
4579                 /* LU-464
4580                  * for echo client, export may be on zombie list, wait for
4581                  * zombie thread to cull it, because cli.cl_import will be
4582                  * cleared in client_disconnect_export():
4583                  *   class_export_destroy() -> obd_cleanup() ->
4584                  *   echo_device_free() -> echo_client_cleanup() ->
4585                  *   obd_disconnect() -> osc_disconnect() ->
4586                  *   client_disconnect_export()
4587                  */
4588                 obd_zombie_barrier();
4589                 if (cli->cl_writeback_work) {
4590                         ptlrpcd_destroy_work(cli->cl_writeback_work);
4591                         cli->cl_writeback_work = NULL;
4592                 }
4593                 obd_cleanup_client_import(obd);
4594                 ptlrpc_lprocfs_unregister_obd(obd);
4595                 lprocfs_obd_cleanup(obd);
4596                 rc = obd_llog_finish(obd, 0);
4597                 if (rc != 0)
4598                         CERROR("failed to cleanup llogging subsystems\n");
4599                 break;
4600                 }
4601         }
4602         RETURN(rc);
4603 }
4604
4605 int osc_cleanup(struct obd_device *obd)
4606 {
4607         int rc;
4608
4609         ENTRY;
4610
4611         /* free memory of osc quota cache */
4612         osc_quota_cleanup(obd);
4613
4614         rc = client_obd_cleanup(obd);
4615
4616         ptlrpcd_decref();
4617         RETURN(rc);
4618 }
4619
4620 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4621 {
4622         struct lprocfs_static_vars lvars = { 0 };
4623         int rc = 0;
4624
4625         lprocfs_osc_init_vars(&lvars);
4626
4627         switch (lcfg->lcfg_command) {
4628         default:
4629                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4630                                               lcfg, obd);
4631                 if (rc > 0)
4632                         rc = 0;
4633                 break;
4634         }
4635
4636         return(rc);
4637 }
4638
4639 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4640 {
4641         return osc_process_config_base(obd, buf);
4642 }
4643
4644 struct obd_ops osc_obd_ops = {
4645         .o_owner                = THIS_MODULE,
4646         .o_setup                = osc_setup,
4647         .o_precleanup           = osc_precleanup,
4648         .o_cleanup              = osc_cleanup,
4649         .o_add_conn             = client_import_add_conn,
4650         .o_del_conn             = client_import_del_conn,
4651         .o_connect              = client_connect_import,
4652         .o_reconnect            = osc_reconnect,
4653         .o_disconnect           = osc_disconnect,
4654         .o_statfs               = osc_statfs,
4655         .o_statfs_async         = osc_statfs_async,
4656         .o_packmd               = osc_packmd,
4657         .o_unpackmd             = osc_unpackmd,
4658         .o_precreate            = osc_precreate,
4659         .o_create               = osc_create,
4660         .o_create_async         = osc_create_async,
4661         .o_destroy              = osc_destroy,
4662         .o_getattr              = osc_getattr,
4663         .o_getattr_async        = osc_getattr_async,
4664         .o_setattr              = osc_setattr,
4665         .o_setattr_async        = osc_setattr_async,
4666         .o_brw                  = osc_brw,
4667         .o_punch                = osc_punch,
4668         .o_sync                 = osc_sync,
4669         .o_enqueue              = osc_enqueue,
4670         .o_change_cbdata        = osc_change_cbdata,
4671         .o_find_cbdata          = osc_find_cbdata,
4672         .o_cancel               = osc_cancel,
4673         .o_cancel_unused        = osc_cancel_unused,
4674         .o_iocontrol            = osc_iocontrol,
4675         .o_get_info             = osc_get_info,
4676         .o_set_info_async       = osc_set_info_async,
4677         .o_import_event         = osc_import_event,
4678         .o_llog_init            = osc_llog_init,
4679         .o_llog_finish          = osc_llog_finish,
4680         .o_process_config       = osc_process_config,
4681         .o_quotactl             = osc_quotactl,
4682         .o_quotacheck           = osc_quotacheck,
4683         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
4684 };
4685
4686 extern struct lu_kmem_descr osc_caches[];
4687 extern cfs_spinlock_t       osc_ast_guard;
4688 extern cfs_lock_class_key_t osc_ast_guard_class;
4689
4690 int __init osc_init(void)
4691 {
4692         struct lprocfs_static_vars lvars = { 0 };
4693         int rc;
4694         ENTRY;
4695
4696         /* print an address of _any_ initialized kernel symbol from this
4697          * module, to allow debugging with gdb that doesn't support data
4698          * symbols from modules.*/
4699         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4700
4701         rc = lu_kmem_init(osc_caches);
4702
4703         lprocfs_osc_init_vars(&lvars);
4704
4705         osc_quota_init();
4706         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4707                                  LUSTRE_OSC_NAME, &osc_device_type);
4708         if (rc) {
4709                 lu_kmem_fini(osc_caches);
4710                 RETURN(rc);
4711         }
4712
4713         cfs_spin_lock_init(&osc_ast_guard);
4714         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4715
4716         osc_mds_ost_orig_logops = llog_lvfs_ops;
4717         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4718         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4719         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4720         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4721
4722         RETURN(rc);
4723 }
4724
4725 #ifdef __KERNEL__
4726 static void /*__exit*/ osc_exit(void)
4727 {
4728         lu_device_type_fini(&osc_device_type);
4729
4730         osc_quota_exit();
4731         class_unregister_type(LUSTRE_OSC_NAME);
4732         lu_kmem_fini(osc_caches);
4733 }
4734
4735 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4736 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4737 MODULE_LICENSE("GPL");
4738
4739 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4740 #endif