Whamcloud - gitweb
ba5946bd230641e5b9d5e2b1d81db59b9d35ad14
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62
63 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
64 static int brw_interpret(const struct lu_env *env,
65                          struct ptlrpc_request *req, void *data, int rc);
66 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
67                             int ptlrpc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (!lmmp)
79                 RETURN(lmm_size);
80
81         if (*lmmp && !lsm) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         }
86
87         if (!*lmmp) {
88                 OBD_ALLOC(*lmmp, lmm_size);
89                 if (!*lmmp)
90                         RETURN(-ENOMEM);
91         }
92
93         if (lsm) {
94                 LASSERT(lsm->lsm_object_id);
95                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
96                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof (*lmm)) {
113                         CERROR("lov_mds_md too small: %d, need %d\n",
114                                lmm_bytes, (int)sizeof(*lmm));
115                         RETURN(-EINVAL);
116                 }
117                 /* XXX LOV_MAGIC etc check? */
118
119                 if (lmm->lmm_object_id == 0) {
120                         CERROR("lov_mds_md: zero lmm_object_id\n");
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (*lsmp == NULL)
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         }
147
148         if (lmm != NULL) {
149                 /* XXX zero *lsmp? */
150                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
151                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
152                 LASSERT((*lsmp)->lsm_object_id);
153                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
154         }
155
156         if (imp != NULL &&
157             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159         else
160                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
190         osc_pack_capa(req, body, oinfo->oi_capa);
191 }
192
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194                                      const struct req_msg_field *field,
195                                      struct obd_capa *oc)
196 {
197         if (oc == NULL)
198                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199         else
200                 /* it is already calculated as sizeof struct obd_capa */
201                 ;
202 }
203
204 static int osc_getattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_async_args *aa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
297
298         /* This should really be sent by the OST */
299         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_setattr_args *sa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
364 out:
365         rc = sa->sa_upcall(sa->sa_cookie, rc);
366         RETURN(rc);
367 }
368
369 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
370                            struct obd_trans_info *oti,
371                            obd_enqueue_update_f upcall, void *cookie,
372                            struct ptlrpc_request_set *rqset)
373 {
374         struct ptlrpc_request   *req;
375         struct osc_setattr_args *sa;
376         int                      rc;
377         ENTRY;
378
379         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380         if (req == NULL)
381                 RETURN(-ENOMEM);
382
383         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
384         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385         if (rc) {
386                 ptlrpc_request_free(req);
387                 RETURN(rc);
388         }
389
390         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
391                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392
393         osc_pack_req_body(req, oinfo);
394
395         ptlrpc_request_set_replen(req);
396
397         /* do mds to ost setattr asynchronously */
398         if (!rqset) {
399                 /* Do not wait for response. */
400                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401         } else {
402                 req->rq_interpret_reply =
403                         (ptlrpc_interpterer_t)osc_setattr_interpret;
404
405                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
406                 sa = ptlrpc_req_async_args(req);
407                 sa->sa_oa = oinfo->oi_oa;
408                 sa->sa_upcall = upcall;
409                 sa->sa_cookie = cookie;
410
411                 if (rqset == PTLRPCD_SET)
412                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413                 else
414                         ptlrpc_set_add_req(rqset, req);
415         }
416
417         RETURN(0);
418 }
419
420 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
421                              struct obd_trans_info *oti,
422                              struct ptlrpc_request_set *rqset)
423 {
424         return osc_setattr_async_base(exp, oinfo, oti,
425                                       oinfo->oi_cb_up, oinfo, rqset);
426 }
427
428 int osc_real_create(struct obd_export *exp, struct obdo *oa,
429                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 {
431         struct ptlrpc_request *req;
432         struct ost_body       *body;
433         struct lov_stripe_md  *lsm;
434         int                    rc;
435         ENTRY;
436
437         LASSERT(oa);
438         LASSERT(ea);
439
440         lsm = *ea;
441         if (!lsm) {
442                 rc = obd_alloc_memmd(exp, &lsm);
443                 if (rc < 0)
444                         RETURN(rc);
445         }
446
447         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448         if (req == NULL)
449                 GOTO(out, rc = -ENOMEM);
450
451         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452         if (rc) {
453                 ptlrpc_request_free(req);
454                 GOTO(out, rc);
455         }
456
457         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
458         LASSERT(body);
459         lustre_set_wire_obdo(&body->oa, oa);
460
461         ptlrpc_request_set_replen(req);
462
463         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464             oa->o_flags == OBD_FL_DELORPHAN) {
465                 DEBUG_REQ(D_HA, req,
466                           "delorphan from OST integration");
467                 /* Don't resend the delorphan req */
468                 req->rq_no_resend = req->rq_no_delay = 1;
469         }
470
471         rc = ptlrpc_queue_wait(req);
472         if (rc)
473                 GOTO(out_req, rc);
474
475         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476         if (body == NULL)
477                 GOTO(out_req, rc = -EPROTO);
478
479         lustre_get_wire_obdo(oa, &body->oa);
480
481         /* This should really be sent by the OST */
482         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
483         oa->o_valid |= OBD_MD_FLBLKSZ;
484
485         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486          * have valid lsm_oinfo data structs, so don't go touching that.
487          * This needs to be fixed in a big way.
488          */
489         lsm->lsm_object_id = oa->o_id;
490         lsm->lsm_object_seq = oa->o_seq;
491         *ea = lsm;
492
493         if (oti != NULL) {
494                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495
496                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
497                         if (!oti->oti_logcookies)
498                                 oti_alloc_cookies(oti, 1);
499                         *oti->oti_logcookies = oa->o_lcookie;
500                 }
501         }
502
503         CDEBUG(D_HA, "transno: "LPD64"\n",
504                lustre_msg_get_transno(req->rq_repmsg));
505 out_req:
506         ptlrpc_req_finished(req);
507 out:
508         if (rc && !*ea)
509                 obd_free_memmd(exp, &lsm);
510         RETURN(rc);
511 }
512
513 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
514                    obd_enqueue_update_f upcall, void *cookie,
515                    struct ptlrpc_request_set *rqset)
516 {
517         struct ptlrpc_request   *req;
518         struct osc_setattr_args *sa;
519         struct ost_body         *body;
520         int                      rc;
521         ENTRY;
522
523         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
524         if (req == NULL)
525                 RETURN(-ENOMEM);
526
527         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
528         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529         if (rc) {
530                 ptlrpc_request_free(req);
531                 RETURN(rc);
532         }
533         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
534         ptlrpc_at_set_req_timeout(req);
535
536         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537         LASSERT(body);
538         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
539         osc_pack_capa(req, body, oinfo->oi_capa);
540
541         ptlrpc_request_set_replen(req);
542
543         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545         sa = ptlrpc_req_async_args(req);
546         sa->sa_oa     = oinfo->oi_oa;
547         sa->sa_upcall = upcall;
548         sa->sa_cookie = cookie;
549         if (rqset == PTLRPCD_SET)
550                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551         else
552                 ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558                      struct obd_info *oinfo, struct obd_trans_info *oti,
559                      struct ptlrpc_request_set *rqset)
560 {
561         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
562         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564         return osc_punch_base(exp, oinfo,
565                               oinfo->oi_cb_up, oinfo, rqset);
566 }
567
568 static int osc_sync_interpret(const struct lu_env *env,
569                               struct ptlrpc_request *req,
570                               void *arg, int rc)
571 {
572         struct osc_async_args *aa = arg;
573         struct ost_body *body;
574         ENTRY;
575
576         if (rc)
577                 GOTO(out, rc);
578
579         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580         if (body == NULL) {
581                 CERROR ("can't unpack ost_body\n");
582                 GOTO(out, rc = -EPROTO);
583         }
584
585         *aa->aa_oi->oi_oa = body->oa;
586 out:
587         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
588         RETURN(rc);
589 }
590
591 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
592                     struct obd_info *oinfo, obd_size start, obd_size end,
593                     struct ptlrpc_request_set *set)
594 {
595         struct ptlrpc_request *req;
596         struct ost_body       *body;
597         struct osc_async_args *aa;
598         int                    rc;
599         ENTRY;
600
601         if (!oinfo->oi_oa) {
602                 CDEBUG(D_INFO, "oa NULL\n");
603                 RETURN(-EINVAL);
604         }
605
606         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
607         if (req == NULL)
608                 RETURN(-ENOMEM);
609
610         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
611         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
612         if (rc) {
613                 ptlrpc_request_free(req);
614                 RETURN(rc);
615         }
616
617         /* overload the size and blocks fields in the oa with start/end */
618         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
619         LASSERT(body);
620         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
621         body->oa.o_size = start;
622         body->oa.o_blocks = end;
623         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
624         osc_pack_capa(req, body, oinfo->oi_capa);
625
626         ptlrpc_request_set_replen(req);
627         req->rq_interpret_reply = osc_sync_interpret;
628
629         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
630         aa = ptlrpc_req_async_args(req);
631         aa->aa_oi = oinfo;
632
633         ptlrpc_set_add_req(set, req);
634         RETURN (0);
635 }
636
637 /* Find and cancel locally locks matched by @mode in the resource found by
638  * @objid. Found locks are added into @cancel list. Returns the amount of
639  * locks added to @cancels list. */
640 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
641                                    cfs_list_t *cancels,
642                                    ldlm_mode_t mode, int lock_flags)
643 {
644         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
645         struct ldlm_res_id res_id;
646         struct ldlm_resource *res;
647         int count;
648         ENTRY;
649
650         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
651         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
652         if (res == NULL)
653                 RETURN(0);
654
655         LDLM_RESOURCE_ADDREF(res);
656         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657                                            lock_flags, 0, NULL);
658         LDLM_RESOURCE_DELREF(res);
659         ldlm_resource_putref(res);
660         RETURN(count);
661 }
662
663 static int osc_destroy_interpret(const struct lu_env *env,
664                                  struct ptlrpc_request *req, void *data,
665                                  int rc)
666 {
667         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668
669         cfs_atomic_dec(&cli->cl_destroy_in_flight);
670         cfs_waitq_signal(&cli->cl_destroy_waitq);
671         return 0;
672 }
673
674 static int osc_can_send_destroy(struct client_obd *cli)
675 {
676         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
677             cli->cl_max_rpcs_in_flight) {
678                 /* The destroy request can be sent */
679                 return 1;
680         }
681         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
682             cli->cl_max_rpcs_in_flight) {
683                 /*
684                  * The counter has been modified between the two atomic
685                  * operations.
686                  */
687                 cfs_waitq_signal(&cli->cl_destroy_waitq);
688         }
689         return 0;
690 }
691
692 /* Destroy requests can be async always on the client, and we don't even really
693  * care about the return code since the client cannot do anything at all about
694  * a destroy failure.
695  * When the MDS is unlinking a filename, it saves the file objects into a
696  * recovery llog, and these object records are cancelled when the OST reports
697  * they were destroyed and sync'd to disk (i.e. transaction committed).
698  * If the client dies, or the OST is down when the object should be destroyed,
699  * the records are not cancelled, and when the OST reconnects to the MDS next,
700  * it will retrieve the llog unlink logs and then sends the log cancellation
701  * cookies to the MDS after committing destroy transactions. */
702 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
703                        struct obdo *oa, struct lov_stripe_md *ea,
704                        struct obd_trans_info *oti, struct obd_export *md_export,
705                        void *capa)
706 {
707         struct client_obd     *cli = &exp->exp_obd->u.cli;
708         struct ptlrpc_request *req;
709         struct ost_body       *body;
710         CFS_LIST_HEAD(cancels);
711         int rc, count;
712         ENTRY;
713
714         if (!oa) {
715                 CDEBUG(D_INFO, "oa NULL\n");
716                 RETURN(-EINVAL);
717         }
718
719         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
720                                         LDLM_FL_DISCARD_DATA);
721
722         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
723         if (req == NULL) {
724                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
725                 RETURN(-ENOMEM);
726         }
727
728         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
729         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
730                                0, &cancels, count);
731         if (rc) {
732                 ptlrpc_request_free(req);
733                 RETURN(rc);
734         }
735
736         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
737         ptlrpc_at_set_req_timeout(req);
738
739         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
740                 oa->o_lcookie = *oti->oti_logcookies;
741         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
742         LASSERT(body);
743         lustre_set_wire_obdo(&body->oa, oa);
744
745         osc_pack_capa(req, body, (struct obd_capa *)capa);
746         ptlrpc_request_set_replen(req);
747
748         /* don't throttle destroy RPCs for the MDT */
749         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
750                 req->rq_interpret_reply = osc_destroy_interpret;
751                 if (!osc_can_send_destroy(cli)) {
752                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
753                                                           NULL);
754
755                         /*
756                          * Wait until the number of on-going destroy RPCs drops
757                          * under max_rpc_in_flight
758                          */
759                         l_wait_event_exclusive(cli->cl_destroy_waitq,
760                                                osc_can_send_destroy(cli), &lwi);
761                 }
762         }
763
764         /* Do not wait for response */
765         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
766         RETURN(0);
767 }
768
769 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
770                                 long writing_bytes)
771 {
772         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
773
774         LASSERT(!(oa->o_valid & bits));
775
776         oa->o_valid |= bits;
777         client_obd_list_lock(&cli->cl_loi_list_lock);
778         oa->o_dirty = cli->cl_dirty;
779         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
780                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
781                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
782                 oa->o_undirty = 0;
783         } else if (cfs_atomic_read(&obd_dirty_pages) -
784                    cfs_atomic_read(&obd_dirty_transit_pages) >
785                    obd_max_dirty_pages + 1){
786                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
787                  * not covered by a lock thus they may safely race and trip
788                  * this CERROR() unless we add in a small fudge factor (+1). */
789                 CERROR("dirty %d - %d > system dirty_max %d\n",
790                        cfs_atomic_read(&obd_dirty_pages),
791                        cfs_atomic_read(&obd_dirty_transit_pages),
792                        obd_max_dirty_pages);
793                 oa->o_undirty = 0;
794         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
795                 CERROR("dirty %lu - dirty_max %lu too big???\n",
796                        cli->cl_dirty, cli->cl_dirty_max);
797                 oa->o_undirty = 0;
798         } else {
799                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
800                                 (cli->cl_max_rpcs_in_flight + 1);
801                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
802         }
803         oa->o_grant = cli->cl_avail_grant;
804         oa->o_dropped = cli->cl_lost_grant;
805         cli->cl_lost_grant = 0;
806         client_obd_list_unlock(&cli->cl_loi_list_lock);
807         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
808                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
809
810 }
811
812 static void osc_update_next_shrink(struct client_obd *cli)
813 {
814         cli->cl_next_shrink_grant =
815                 cfs_time_shift(cli->cl_grant_shrink_interval);
816         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
817                cli->cl_next_shrink_grant);
818 }
819
820 /* caller must hold loi_list_lock */
821 static void osc_consume_write_grant(struct client_obd *cli,
822                                     struct brw_page *pga)
823 {
824         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
825         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
826         cfs_atomic_inc(&obd_dirty_pages);
827         cli->cl_dirty += CFS_PAGE_SIZE;
828         cli->cl_avail_grant -= CFS_PAGE_SIZE;
829         pga->flag |= OBD_BRW_FROM_GRANT;
830         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
831                CFS_PAGE_SIZE, pga, pga->pg);
832         LASSERT(cli->cl_avail_grant >= 0);
833         osc_update_next_shrink(cli);
834 }
835
836 /* the companion to osc_consume_write_grant, called when a brw has completed.
837  * must be called with the loi lock held. */
838 static void osc_release_write_grant(struct client_obd *cli,
839                                     struct brw_page *pga, int sent)
840 {
841         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
842         ENTRY;
843
844         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
845         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
846                 EXIT;
847                 return;
848         }
849
850         pga->flag &= ~OBD_BRW_FROM_GRANT;
851         cfs_atomic_dec(&obd_dirty_pages);
852         cli->cl_dirty -= CFS_PAGE_SIZE;
853         if (pga->flag & OBD_BRW_NOCACHE) {
854                 pga->flag &= ~OBD_BRW_NOCACHE;
855                 cfs_atomic_dec(&obd_dirty_transit_pages);
856                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
857         }
858         if (!sent) {
859                 /* Reclaim grant from truncated pages. This is used to solve
860                  * write-truncate and grant all gone(to lost_grant) problem.
861                  * For a vfs write this problem can be easily solved by a sync
862                  * write, however, this is not an option for page_mkwrite()
863                  * because grant has to be allocated before a page becomes
864                  * dirty. */
865                 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
866                         cli->cl_avail_grant += CFS_PAGE_SIZE;
867                 else
868                         cli->cl_lost_grant += CFS_PAGE_SIZE;
869                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
870                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
871         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
872                 /* For short writes we shouldn't count parts of pages that
873                  * span a whole block on the OST side, or our accounting goes
874                  * wrong.  Should match the code in filter_grant_check. */
875                 int offset = pga->off & ~CFS_PAGE_MASK;
876                 int count = pga->count + (offset & (blocksize - 1));
877                 int end = (offset + pga->count) & (blocksize - 1);
878                 if (end)
879                         count += blocksize - end;
880
881                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
882                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
883                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
884                        cli->cl_avail_grant, cli->cl_dirty);
885         }
886
887         EXIT;
888 }
889
890 static unsigned long rpcs_in_flight(struct client_obd *cli)
891 {
892         return cli->cl_r_in_flight + cli->cl_w_in_flight;
893 }
894
895 /* caller must hold loi_list_lock */
896 void osc_wake_cache_waiters(struct client_obd *cli)
897 {
898         cfs_list_t *l, *tmp;
899         struct osc_cache_waiter *ocw;
900
901         ENTRY;
902         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
903                 /* if we can't dirty more, we must wait until some is written */
904                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
905                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
906                     obd_max_dirty_pages)) {
907                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
908                                "osc max %ld, sys max %d\n", cli->cl_dirty,
909                                cli->cl_dirty_max, obd_max_dirty_pages);
910                         return;
911                 }
912
913                 /* if still dirty cache but no grant wait for pending RPCs that
914                  * may yet return us some grant before doing sync writes */
915                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
916                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
917                                cli->cl_w_in_flight);
918                         return;
919                 }
920
921                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
922                 cfs_list_del_init(&ocw->ocw_entry);
923                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
924                         /* no more RPCs in flight to return grant, do sync IO */
925                         ocw->ocw_rc = -EDQUOT;
926                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
927                 } else {
928                         osc_consume_write_grant(cli,
929                                                 &ocw->ocw_oap->oap_brw_page);
930                 }
931
932                 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
933                        ocw, ocw->ocw_oap, cli->cl_avail_grant);
934
935                 cfs_waitq_signal(&ocw->ocw_waitq);
936         }
937
938         EXIT;
939 }
940
941 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
942 {
943         client_obd_list_lock(&cli->cl_loi_list_lock);
944         cli->cl_avail_grant += grant;
945         client_obd_list_unlock(&cli->cl_loi_list_lock);
946 }
947
948 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
949 {
950         if (body->oa.o_valid & OBD_MD_FLGRANT) {
951                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
952                 __osc_update_grant(cli, body->oa.o_grant);
953         }
954 }
955
956 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
957                               obd_count keylen, void *key, obd_count vallen,
958                               void *val, struct ptlrpc_request_set *set);
959
960 static int osc_shrink_grant_interpret(const struct lu_env *env,
961                                       struct ptlrpc_request *req,
962                                       void *aa, int rc)
963 {
964         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
965         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
966         struct ost_body *body;
967
968         if (rc != 0) {
969                 __osc_update_grant(cli, oa->o_grant);
970                 GOTO(out, rc);
971         }
972
973         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
974         LASSERT(body);
975         osc_update_grant(cli, body);
976 out:
977         OBDO_FREE(oa);
978         return rc;
979 }
980
981 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
982 {
983         client_obd_list_lock(&cli->cl_loi_list_lock);
984         oa->o_grant = cli->cl_avail_grant / 4;
985         cli->cl_avail_grant -= oa->o_grant;
986         client_obd_list_unlock(&cli->cl_loi_list_lock);
987         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
988                 oa->o_valid |= OBD_MD_FLFLAGS;
989                 oa->o_flags = 0;
990         }
991         oa->o_flags |= OBD_FL_SHRINK_GRANT;
992         osc_update_next_shrink(cli);
993 }
994
995 /* Shrink the current grant, either from some large amount to enough for a
996  * full set of in-flight RPCs, or if we have already shrunk to that limit
997  * then to enough for a single RPC.  This avoids keeping more grant than
998  * needed, and avoids shrinking the grant piecemeal. */
999 static int osc_shrink_grant(struct client_obd *cli)
1000 {
1001         long target = (cli->cl_max_rpcs_in_flight + 1) *
1002                       cli->cl_max_pages_per_rpc;
1003
1004         client_obd_list_lock(&cli->cl_loi_list_lock);
1005         if (cli->cl_avail_grant <= target)
1006                 target = cli->cl_max_pages_per_rpc;
1007         client_obd_list_unlock(&cli->cl_loi_list_lock);
1008
1009         return osc_shrink_grant_to_target(cli, target);
1010 }
1011
1012 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1013 {
1014         int    rc = 0;
1015         struct ost_body     *body;
1016         ENTRY;
1017
1018         client_obd_list_lock(&cli->cl_loi_list_lock);
1019         /* Don't shrink if we are already above or below the desired limit
1020          * We don't want to shrink below a single RPC, as that will negatively
1021          * impact block allocation and long-term performance. */
1022         if (target < cli->cl_max_pages_per_rpc)
1023                 target = cli->cl_max_pages_per_rpc;
1024
1025         if (target >= cli->cl_avail_grant) {
1026                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1027                 RETURN(0);
1028         }
1029         client_obd_list_unlock(&cli->cl_loi_list_lock);
1030
1031         OBD_ALLOC_PTR(body);
1032         if (!body)
1033                 RETURN(-ENOMEM);
1034
1035         osc_announce_cached(cli, &body->oa, 0);
1036
1037         client_obd_list_lock(&cli->cl_loi_list_lock);
1038         body->oa.o_grant = cli->cl_avail_grant - target;
1039         cli->cl_avail_grant = target;
1040         client_obd_list_unlock(&cli->cl_loi_list_lock);
1041         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1042                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1043                 body->oa.o_flags = 0;
1044         }
1045         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1046         osc_update_next_shrink(cli);
1047
1048         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
1049                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1050                                 sizeof(*body), body, NULL);
1051         if (rc != 0)
1052                 __osc_update_grant(cli, body->oa.o_grant);
1053         OBD_FREE_PTR(body);
1054         RETURN(rc);
1055 }
1056
1057 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1058 static int osc_should_shrink_grant(struct client_obd *client)
1059 {
1060         cfs_time_t time = cfs_time_current();
1061         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1062
1063         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1064              OBD_CONNECT_GRANT_SHRINK) == 0)
1065                 return 0;
1066
1067         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1068                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1069                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1070                         return 1;
1071                 else
1072                         osc_update_next_shrink(client);
1073         }
1074         return 0;
1075 }
1076
1077 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1078 {
1079         struct client_obd *client;
1080
1081         cfs_list_for_each_entry(client, &item->ti_obd_list,
1082                                 cl_grant_shrink_list) {
1083                 if (osc_should_shrink_grant(client))
1084                         osc_shrink_grant(client);
1085         }
1086         return 0;
1087 }
1088
1089 static int osc_add_shrink_grant(struct client_obd *client)
1090 {
1091         int rc;
1092
1093         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1094                                        TIMEOUT_GRANT,
1095                                        osc_grant_shrink_grant_cb, NULL,
1096                                        &client->cl_grant_shrink_list);
1097         if (rc) {
1098                 CERROR("add grant client %s error %d\n",
1099                         client->cl_import->imp_obd->obd_name, rc);
1100                 return rc;
1101         }
1102         CDEBUG(D_CACHE, "add grant client %s \n",
1103                client->cl_import->imp_obd->obd_name);
1104         osc_update_next_shrink(client);
1105         return 0;
1106 }
1107
1108 static int osc_del_shrink_grant(struct client_obd *client)
1109 {
1110         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1111                                          TIMEOUT_GRANT);
1112 }
1113
1114 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1115 {
1116         /*
1117          * ocd_grant is the total grant amount we're expect to hold: if we've
1118          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1119          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1120          *
1121          * race is tolerable here: if we're evicted, but imp_state already
1122          * left EVICTED state, then cl_dirty must be 0 already.
1123          */
1124         client_obd_list_lock(&cli->cl_loi_list_lock);
1125         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1126                 cli->cl_avail_grant = ocd->ocd_grant;
1127         else
1128                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1129
1130         if (cli->cl_avail_grant < 0) {
1131                 CWARN("%s: available grant < 0, the OSS is probably not running"
1132                       " with patch from bug20278 (%ld) \n",
1133                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1134                 /* workaround for 1.6 servers which do not have
1135                  * the patch from bug20278 */
1136                 cli->cl_avail_grant = ocd->ocd_grant;
1137         }
1138
1139         client_obd_list_unlock(&cli->cl_loi_list_lock);
1140
1141         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1142                cli->cl_import->imp_obd->obd_name,
1143                cli->cl_avail_grant, cli->cl_lost_grant);
1144
1145         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1146             cfs_list_empty(&cli->cl_grant_shrink_list))
1147                 osc_add_shrink_grant(cli);
1148 }
1149
1150 /* We assume that the reason this OSC got a short read is because it read
1151  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1152  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1153  * this stripe never got written at or beyond this stripe offset yet. */
1154 static void handle_short_read(int nob_read, obd_count page_count,
1155                               struct brw_page **pga)
1156 {
1157         char *ptr;
1158         int i = 0;
1159
1160         /* skip bytes read OK */
1161         while (nob_read > 0) {
1162                 LASSERT (page_count > 0);
1163
1164                 if (pga[i]->count > nob_read) {
1165                         /* EOF inside this page */
1166                         ptr = cfs_kmap(pga[i]->pg) +
1167                                 (pga[i]->off & ~CFS_PAGE_MASK);
1168                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1169                         cfs_kunmap(pga[i]->pg);
1170                         page_count--;
1171                         i++;
1172                         break;
1173                 }
1174
1175                 nob_read -= pga[i]->count;
1176                 page_count--;
1177                 i++;
1178         }
1179
1180         /* zero remaining pages */
1181         while (page_count-- > 0) {
1182                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1183                 memset(ptr, 0, pga[i]->count);
1184                 cfs_kunmap(pga[i]->pg);
1185                 i++;
1186         }
1187 }
1188
1189 static int check_write_rcs(struct ptlrpc_request *req,
1190                            int requested_nob, int niocount,
1191                            obd_count page_count, struct brw_page **pga)
1192 {
1193         int     i;
1194         __u32   *remote_rcs;
1195
1196         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1197                                                   sizeof(*remote_rcs) *
1198                                                   niocount);
1199         if (remote_rcs == NULL) {
1200                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1201                 return(-EPROTO);
1202         }
1203
1204         /* return error if any niobuf was in error */
1205         for (i = 0; i < niocount; i++) {
1206                 if ((int)remote_rcs[i] < 0)
1207                         return(remote_rcs[i]);
1208
1209                 if (remote_rcs[i] != 0) {
1210                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1211                                 i, remote_rcs[i], req);
1212                         return(-EPROTO);
1213                 }
1214         }
1215
1216         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1217                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1218                        req->rq_bulk->bd_nob_transferred, requested_nob);
1219                 return(-EPROTO);
1220         }
1221
1222         return (0);
1223 }
1224
1225 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1226 {
1227         if (p1->flag != p2->flag) {
1228                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1229                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1230
1231                 /* warn if we try to combine flags that we don't know to be
1232                  * safe to combine */
1233                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1234                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1235                               "report this at http://bugs.whamcloud.com/\n",
1236                               p1->flag, p2->flag);
1237                 }
1238                 return 0;
1239         }
1240
1241         return (p1->off + p1->count == p2->off);
1242 }
1243
1244 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1245                                    struct brw_page **pga, int opc,
1246                                    cksum_type_t cksum_type)
1247 {
1248         __u32 cksum;
1249         int i = 0;
1250
1251         LASSERT (pg_count > 0);
1252         cksum = init_checksum(cksum_type);
1253         while (nob > 0 && pg_count > 0) {
1254                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1255                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1256                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1257
1258                 /* corrupt the data before we compute the checksum, to
1259                  * simulate an OST->client data error */
1260                 if (i == 0 && opc == OST_READ &&
1261                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1262                         memcpy(ptr + off, "bad1", min(4, nob));
1263                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1264                 cfs_kunmap(pga[i]->pg);
1265                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1266                                off, cksum);
1267
1268                 nob -= pga[i]->count;
1269                 pg_count--;
1270                 i++;
1271         }
1272         /* For sending we only compute the wrong checksum instead
1273          * of corrupting the data so it is still correct on a redo */
1274         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1275                 cksum++;
1276
1277         return fini_checksum(cksum, cksum_type);
1278 }
1279
1280 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1281                                 struct lov_stripe_md *lsm, obd_count page_count,
1282                                 struct brw_page **pga,
1283                                 struct ptlrpc_request **reqp,
1284                                 struct obd_capa *ocapa, int reserve,
1285                                 int resend)
1286 {
1287         struct ptlrpc_request   *req;
1288         struct ptlrpc_bulk_desc *desc;
1289         struct ost_body         *body;
1290         struct obd_ioobj        *ioobj;
1291         struct niobuf_remote    *niobuf;
1292         int niocount, i, requested_nob, opc, rc;
1293         struct osc_brw_async_args *aa;
1294         struct req_capsule      *pill;
1295         struct brw_page *pg_prev;
1296
1297         ENTRY;
1298         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1299                 RETURN(-ENOMEM); /* Recoverable */
1300         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1301                 RETURN(-EINVAL); /* Fatal */
1302
1303         if ((cmd & OBD_BRW_WRITE) != 0) {
1304                 opc = OST_WRITE;
1305                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1306                                                 cli->cl_import->imp_rq_pool,
1307                                                 &RQF_OST_BRW_WRITE);
1308         } else {
1309                 opc = OST_READ;
1310                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1311         }
1312         if (req == NULL)
1313                 RETURN(-ENOMEM);
1314
1315         for (niocount = i = 1; i < page_count; i++) {
1316                 if (!can_merge_pages(pga[i - 1], pga[i]))
1317                         niocount++;
1318         }
1319
1320         pill = &req->rq_pill;
1321         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1322                              sizeof(*ioobj));
1323         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1324                              niocount * sizeof(*niobuf));
1325         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1326
1327         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1328         if (rc) {
1329                 ptlrpc_request_free(req);
1330                 RETURN(rc);
1331         }
1332         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1333         ptlrpc_at_set_req_timeout(req);
1334
1335         if (opc == OST_WRITE)
1336                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1337                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1338         else
1339                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1340                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1341
1342         if (desc == NULL)
1343                 GOTO(out, rc = -ENOMEM);
1344         /* NB request now owns desc and will free it when it gets freed */
1345
1346         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1347         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1348         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1349         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1350
1351         lustre_set_wire_obdo(&body->oa, oa);
1352
1353         obdo_to_ioobj(oa, ioobj);
1354         ioobj->ioo_bufcnt = niocount;
1355         osc_pack_capa(req, body, ocapa);
1356         LASSERT (page_count > 0);
1357         pg_prev = pga[0];
1358         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1359                 struct brw_page *pg = pga[i];
1360                 int poff = pg->off & ~CFS_PAGE_MASK;
1361
1362                 LASSERT(pg->count > 0);
1363                 /* make sure there is no gap in the middle of page array */
1364                 LASSERTF(page_count == 1 ||
1365                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1366                           ergo(i > 0 && i < page_count - 1,
1367                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1368                           ergo(i == page_count - 1, poff == 0)),
1369                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1370                          i, page_count, pg, pg->off, pg->count);
1371 #ifdef __linux__
1372                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1373                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1374                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1375                          i, page_count,
1376                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1377                          pg_prev->pg, page_private(pg_prev->pg),
1378                          pg_prev->pg->index, pg_prev->off);
1379 #else
1380                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1381                          "i %d p_c %u\n", i, page_count);
1382 #endif
1383                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1384                         (pg->flag & OBD_BRW_SRVLOCK));
1385
1386                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1387                 requested_nob += pg->count;
1388
1389                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1390                         niobuf--;
1391                         niobuf->len += pg->count;
1392                 } else {
1393                         niobuf->offset = pg->off;
1394                         niobuf->len    = pg->count;
1395                         niobuf->flags  = pg->flag;
1396                 }
1397                 pg_prev = pg;
1398         }
1399
1400         LASSERTF((void *)(niobuf - niocount) ==
1401                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1402                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1403                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1404
1405         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1406         if (resend) {
1407                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1408                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1409                         body->oa.o_flags = 0;
1410                 }
1411                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1412         }
1413
1414         if (osc_should_shrink_grant(cli))
1415                 osc_shrink_grant_local(cli, &body->oa);
1416
1417         /* size[REQ_REC_OFF] still sizeof (*body) */
1418         if (opc == OST_WRITE) {
1419                 if (cli->cl_checksum &&
1420                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1421                         /* store cl_cksum_type in a local variable since
1422                          * it can be changed via lprocfs */
1423                         cksum_type_t cksum_type = cli->cl_cksum_type;
1424
1425                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1426                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1427                                 body->oa.o_flags = 0;
1428                         }
1429                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1430                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1432                                                              page_count, pga,
1433                                                              OST_WRITE,
1434                                                              cksum_type);
1435                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1436                                body->oa.o_cksum);
1437                         /* save this in 'oa', too, for later checking */
1438                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1439                         oa->o_flags |= cksum_type_pack(cksum_type);
1440                 } else {
1441                         /* clear out the checksum flag, in case this is a
1442                          * resend but cl_checksum is no longer set. b=11238 */
1443                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1444                 }
1445                 oa->o_cksum = body->oa.o_cksum;
1446                 /* 1 RC per niobuf */
1447                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1448                                      sizeof(__u32) * niocount);
1449         } else {
1450                 if (cli->cl_checksum &&
1451                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1452                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1453                                 body->oa.o_flags = 0;
1454                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1455                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1456                 }
1457         }
1458         ptlrpc_request_set_replen(req);
1459
1460         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1461         aa = ptlrpc_req_async_args(req);
1462         aa->aa_oa = oa;
1463         aa->aa_requested_nob = requested_nob;
1464         aa->aa_nio_count = niocount;
1465         aa->aa_page_count = page_count;
1466         aa->aa_resends = 0;
1467         aa->aa_ppga = pga;
1468         aa->aa_cli = cli;
1469         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1470         if (ocapa && reserve)
1471                 aa->aa_ocapa = capa_get(ocapa);
1472
1473         *reqp = req;
1474         RETURN(0);
1475
1476  out:
1477         ptlrpc_req_finished(req);
1478         RETURN(rc);
1479 }
1480
1481 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1482                                 __u32 client_cksum, __u32 server_cksum, int nob,
1483                                 obd_count page_count, struct brw_page **pga,
1484                                 cksum_type_t client_cksum_type)
1485 {
1486         __u32 new_cksum;
1487         char *msg;
1488         cksum_type_t cksum_type;
1489
1490         if (server_cksum == client_cksum) {
1491                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1492                 return 0;
1493         }
1494
1495         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1496                                        oa->o_flags : 0);
1497         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1498                                       cksum_type);
1499
1500         if (cksum_type != client_cksum_type)
1501                 msg = "the server did not use the checksum type specified in "
1502                       "the original request - likely a protocol problem";
1503         else if (new_cksum == server_cksum)
1504                 msg = "changed on the client after we checksummed it - "
1505                       "likely false positive due to mmap IO (bug 11742)";
1506         else if (new_cksum == client_cksum)
1507                 msg = "changed in transit before arrival at OST";
1508         else
1509                 msg = "changed in transit AND doesn't match the original - "
1510                       "likely false positive due to mmap IO (bug 11742)";
1511
1512         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1513                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1514                            msg, libcfs_nid2str(peer->nid),
1515                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1516                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1517                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1518                            oa->o_id,
1519                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1520                            pga[0]->off,
1521                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1522         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1523                "client csum now %x\n", client_cksum, client_cksum_type,
1524                server_cksum, cksum_type, new_cksum);
1525         return 1;
1526 }
1527
1528 /* Note rc enters this function as number of bytes transferred */
1529 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1530 {
1531         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1532         const lnet_process_id_t *peer =
1533                         &req->rq_import->imp_connection->c_peer;
1534         struct client_obd *cli = aa->aa_cli;
1535         struct ost_body *body;
1536         __u32 client_cksum = 0;
1537         ENTRY;
1538
1539         if (rc < 0 && rc != -EDQUOT) {
1540                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1541                 RETURN(rc);
1542         }
1543
1544         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1545         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1546         if (body == NULL) {
1547                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1548                 RETURN(-EPROTO);
1549         }
1550
1551         /* set/clear over quota flag for a uid/gid */
1552         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1553             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1554                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1555
1556                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1557                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1558                        body->oa.o_flags);
1559                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1560         }
1561
1562         osc_update_grant(cli, body);
1563
1564         if (rc < 0)
1565                 RETURN(rc);
1566
1567         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1568                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1569
1570         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1571                 if (rc > 0) {
1572                         CERROR("Unexpected +ve rc %d\n", rc);
1573                         RETURN(-EPROTO);
1574                 }
1575                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1576
1577                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1578                         RETURN(-EAGAIN);
1579
1580                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1581                     check_write_checksum(&body->oa, peer, client_cksum,
1582                                          body->oa.o_cksum, aa->aa_requested_nob,
1583                                          aa->aa_page_count, aa->aa_ppga,
1584                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1585                         RETURN(-EAGAIN);
1586
1587                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1588                                      aa->aa_page_count, aa->aa_ppga);
1589                 GOTO(out, rc);
1590         }
1591
1592         /* The rest of this function executes only for OST_READs */
1593
1594         /* if unwrap_bulk failed, return -EAGAIN to retry */
1595         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1596         if (rc < 0)
1597                 GOTO(out, rc = -EAGAIN);
1598
1599         if (rc > aa->aa_requested_nob) {
1600                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1601                        aa->aa_requested_nob);
1602                 RETURN(-EPROTO);
1603         }
1604
1605         if (rc != req->rq_bulk->bd_nob_transferred) {
1606                 CERROR ("Unexpected rc %d (%d transferred)\n",
1607                         rc, req->rq_bulk->bd_nob_transferred);
1608                 return (-EPROTO);
1609         }
1610
1611         if (rc < aa->aa_requested_nob)
1612                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1613
1614         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1615                 static int cksum_counter;
1616                 __u32      server_cksum = body->oa.o_cksum;
1617                 char      *via;
1618                 char      *router;
1619                 cksum_type_t cksum_type;
1620
1621                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1622                                                body->oa.o_flags : 0);
1623                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1624                                                  aa->aa_ppga, OST_READ,
1625                                                  cksum_type);
1626
1627                 if (peer->nid == req->rq_bulk->bd_sender) {
1628                         via = router = "";
1629                 } else {
1630                         via = " via ";
1631                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1632                 }
1633
1634                 if (server_cksum == ~0 && rc > 0) {
1635                         CERROR("Protocol error: server %s set the 'checksum' "
1636                                "bit, but didn't send a checksum.  Not fatal, "
1637                                "but please notify on http://bugs.whamcloud.com/\n",
1638                                libcfs_nid2str(peer->nid));
1639                 } else if (server_cksum != client_cksum) {
1640                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1641                                            "%s%s%s inode "DFID" object "
1642                                            LPU64"/"LPU64" extent "
1643                                            "["LPU64"-"LPU64"]\n",
1644                                            req->rq_import->imp_obd->obd_name,
1645                                            libcfs_nid2str(peer->nid),
1646                                            via, router,
1647                                            body->oa.o_valid & OBD_MD_FLFID ?
1648                                                 body->oa.o_parent_seq : (__u64)0,
1649                                            body->oa.o_valid & OBD_MD_FLFID ?
1650                                                 body->oa.o_parent_oid : 0,
1651                                            body->oa.o_valid & OBD_MD_FLFID ?
1652                                                 body->oa.o_parent_ver : 0,
1653                                            body->oa.o_id,
1654                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1655                                                 body->oa.o_seq : (__u64)0,
1656                                            aa->aa_ppga[0]->off,
1657                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1658                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1659                                                                         1);
1660                         CERROR("client %x, server %x, cksum_type %x\n",
1661                                client_cksum, server_cksum, cksum_type);
1662                         cksum_counter = 0;
1663                         aa->aa_oa->o_cksum = client_cksum;
1664                         rc = -EAGAIN;
1665                 } else {
1666                         cksum_counter++;
1667                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1668                         rc = 0;
1669                 }
1670         } else if (unlikely(client_cksum)) {
1671                 static int cksum_missed;
1672
1673                 cksum_missed++;
1674                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1675                         CERROR("Checksum %u requested from %s but not sent\n",
1676                                cksum_missed, libcfs_nid2str(peer->nid));
1677         } else {
1678                 rc = 0;
1679         }
1680 out:
1681         if (rc >= 0)
1682                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1683
1684         RETURN(rc);
1685 }
1686
1687 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1688                             struct lov_stripe_md *lsm,
1689                             obd_count page_count, struct brw_page **pga,
1690                             struct obd_capa *ocapa)
1691 {
1692         struct ptlrpc_request *req;
1693         int                    rc;
1694         cfs_waitq_t            waitq;
1695         int                    generation, resends = 0;
1696         struct l_wait_info     lwi;
1697
1698         ENTRY;
1699
1700         cfs_waitq_init(&waitq);
1701         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1702
1703 restart_bulk:
1704         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1705                                   page_count, pga, &req, ocapa, 0, resends);
1706         if (rc != 0)
1707                 return (rc);
1708
1709         if (resends) {
1710                 req->rq_generation_set = 1;
1711                 req->rq_import_generation = generation;
1712                 req->rq_sent = cfs_time_current_sec() + resends;
1713         }
1714
1715         rc = ptlrpc_queue_wait(req);
1716
1717         if (rc == -ETIMEDOUT && req->rq_resend) {
1718                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1719                 ptlrpc_req_finished(req);
1720                 goto restart_bulk;
1721         }
1722
1723         rc = osc_brw_fini_request(req, rc);
1724
1725         ptlrpc_req_finished(req);
1726         /* When server return -EINPROGRESS, client should always retry
1727          * regardless of the number of times the bulk was resent already.*/
1728         if (osc_recoverable_error(rc)) {
1729                 resends++;
1730                 if (rc != -EINPROGRESS &&
1731                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1732                         CERROR("%s: too many resend retries for object: "
1733                                ""LPU64":"LPU64", rc = %d.\n",
1734                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1735                         goto out;
1736                 }
1737                 if (generation !=
1738                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1739                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1740                                ""LPU64":"LPU64", rc = %d.\n",
1741                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1742                         goto out;
1743                 }
1744
1745                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1746                                        NULL);
1747                 l_wait_event(waitq, 0, &lwi);
1748
1749                 goto restart_bulk;
1750         }
1751 out:
1752         if (rc == -EAGAIN || rc == -EINPROGRESS)
1753                 rc = -EIO;
1754         RETURN (rc);
1755 }
1756
1757 int osc_brw_redo_request(struct ptlrpc_request *request,
1758                          struct osc_brw_async_args *aa)
1759 {
1760         struct ptlrpc_request *new_req;
1761         struct ptlrpc_request_set *set = request->rq_set;
1762         struct osc_brw_async_args *new_aa;
1763         struct osc_async_page *oap;
1764         int rc = 0;
1765         ENTRY;
1766
1767         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1768
1769         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1770                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1771                                   aa->aa_cli, aa->aa_oa,
1772                                   NULL /* lsm unused by osc currently */,
1773                                   aa->aa_page_count, aa->aa_ppga,
1774                                   &new_req, aa->aa_ocapa, 0, 1);
1775         if (rc)
1776                 RETURN(rc);
1777
1778         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1779
1780         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1781                 if (oap->oap_request != NULL) {
1782                         LASSERTF(request == oap->oap_request,
1783                                  "request %p != oap_request %p\n",
1784                                  request, oap->oap_request);
1785                         if (oap->oap_interrupted) {
1786                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1787                                 ptlrpc_req_finished(new_req);
1788                                 RETURN(-EINTR);
1789                         }
1790                 }
1791         }
1792         /* New request takes over pga and oaps from old request.
1793          * Note that copying a list_head doesn't work, need to move it... */
1794         aa->aa_resends++;
1795         new_req->rq_interpret_reply = request->rq_interpret_reply;
1796         new_req->rq_async_args = request->rq_async_args;
1797         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1798         new_req->rq_generation_set = 1;
1799         new_req->rq_import_generation = request->rq_import_generation;
1800
1801         new_aa = ptlrpc_req_async_args(new_req);
1802
1803         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1804         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1805         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1806
1807         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1808                 if (oap->oap_request) {
1809                         ptlrpc_req_finished(oap->oap_request);
1810                         oap->oap_request = ptlrpc_request_addref(new_req);
1811                 }
1812         }
1813
1814         new_aa->aa_ocapa = aa->aa_ocapa;
1815         aa->aa_ocapa = NULL;
1816
1817         /* use ptlrpc_set_add_req is safe because interpret functions work
1818          * in check_set context. only one way exist with access to request
1819          * from different thread got -EINTR - this way protected with
1820          * cl_loi_list_lock */
1821         ptlrpc_set_add_req(set, new_req);
1822
1823         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1824
1825         DEBUG_REQ(D_INFO, new_req, "new request");
1826         RETURN(0);
1827 }
1828
1829 /*
1830  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1831  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1832  * fine for our small page arrays and doesn't require allocation.  its an
1833  * insertion sort that swaps elements that are strides apart, shrinking the
1834  * stride down until its '1' and the array is sorted.
1835  */
1836 static void sort_brw_pages(struct brw_page **array, int num)
1837 {
1838         int stride, i, j;
1839         struct brw_page *tmp;
1840
1841         if (num == 1)
1842                 return;
1843         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1844                 ;
1845
1846         do {
1847                 stride /= 3;
1848                 for (i = stride ; i < num ; i++) {
1849                         tmp = array[i];
1850                         j = i;
1851                         while (j >= stride && array[j - stride]->off > tmp->off) {
1852                                 array[j] = array[j - stride];
1853                                 j -= stride;
1854                         }
1855                         array[j] = tmp;
1856                 }
1857         } while (stride > 1);
1858 }
1859
1860 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1861 {
1862         int count = 1;
1863         int offset;
1864         int i = 0;
1865
1866         LASSERT (pages > 0);
1867         offset = pg[i]->off & ~CFS_PAGE_MASK;
1868
1869         for (;;) {
1870                 pages--;
1871                 if (pages == 0)         /* that's all */
1872                         return count;
1873
1874                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1875                         return count;   /* doesn't end on page boundary */
1876
1877                 i++;
1878                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1879                 if (offset != 0)        /* doesn't start on page boundary */
1880                         return count;
1881
1882                 count++;
1883         }
1884 }
1885
1886 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1887 {
1888         struct brw_page **ppga;
1889         int i;
1890
1891         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1892         if (ppga == NULL)
1893                 return NULL;
1894
1895         for (i = 0; i < count; i++)
1896                 ppga[i] = pga + i;
1897         return ppga;
1898 }
1899
1900 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1901 {
1902         LASSERT(ppga != NULL);
1903         OBD_FREE(ppga, sizeof(*ppga) * count);
1904 }
1905
1906 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1907                    obd_count page_count, struct brw_page *pga,
1908                    struct obd_trans_info *oti)
1909 {
1910         struct obdo *saved_oa = NULL;
1911         struct brw_page **ppga, **orig;
1912         struct obd_import *imp = class_exp2cliimp(exp);
1913         struct client_obd *cli;
1914         int rc, page_count_orig;
1915         ENTRY;
1916
1917         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1918         cli = &imp->imp_obd->u.cli;
1919
1920         if (cmd & OBD_BRW_CHECK) {
1921                 /* The caller just wants to know if there's a chance that this
1922                  * I/O can succeed */
1923
1924                 if (imp->imp_invalid)
1925                         RETURN(-EIO);
1926                 RETURN(0);
1927         }
1928
1929         /* test_brw with a failed create can trip this, maybe others. */
1930         LASSERT(cli->cl_max_pages_per_rpc);
1931
1932         rc = 0;
1933
1934         orig = ppga = osc_build_ppga(pga, page_count);
1935         if (ppga == NULL)
1936                 RETURN(-ENOMEM);
1937         page_count_orig = page_count;
1938
1939         sort_brw_pages(ppga, page_count);
1940         while (page_count) {
1941                 obd_count pages_per_brw;
1942
1943                 if (page_count > cli->cl_max_pages_per_rpc)
1944                         pages_per_brw = cli->cl_max_pages_per_rpc;
1945                 else
1946                         pages_per_brw = page_count;
1947
1948                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1949
1950                 if (saved_oa != NULL) {
1951                         /* restore previously saved oa */
1952                         *oinfo->oi_oa = *saved_oa;
1953                 } else if (page_count > pages_per_brw) {
1954                         /* save a copy of oa (brw will clobber it) */
1955                         OBDO_ALLOC(saved_oa);
1956                         if (saved_oa == NULL)
1957                                 GOTO(out, rc = -ENOMEM);
1958                         *saved_oa = *oinfo->oi_oa;
1959                 }
1960
1961                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1962                                       pages_per_brw, ppga, oinfo->oi_capa);
1963
1964                 if (rc != 0)
1965                         break;
1966
1967                 page_count -= pages_per_brw;
1968                 ppga += pages_per_brw;
1969         }
1970
1971 out:
1972         osc_release_ppga(orig, page_count_orig);
1973
1974         if (saved_oa != NULL)
1975                 OBDO_FREE(saved_oa);
1976
1977         RETURN(rc);
1978 }
1979
1980 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1981  * the dirty accounting.  Writeback completes or truncate happens before
1982  * writing starts.  Must be called with the loi lock held. */
1983 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1984                            int sent)
1985 {
1986         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1987 }
1988
1989
1990 /* This maintains the lists of pending pages to read/write for a given object
1991  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1992  * to quickly find objects that are ready to send an RPC. */
1993 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1994                          int cmd)
1995 {
1996         ENTRY;
1997
1998         if (lop->lop_num_pending == 0)
1999                 RETURN(0);
2000
2001         /* if we have an invalid import we want to drain the queued pages
2002          * by forcing them through rpcs that immediately fail and complete
2003          * the pages.  recovery relies on this to empty the queued pages
2004          * before canceling the locks and evicting down the llite pages */
2005         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2006                 RETURN(1);
2007
2008         /* stream rpcs in queue order as long as as there is an urgent page
2009          * queued.  this is our cheap solution for good batching in the case
2010          * where writepage marks some random page in the middle of the file
2011          * as urgent because of, say, memory pressure */
2012         if (!cfs_list_empty(&lop->lop_urgent)) {
2013                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2014                 RETURN(1);
2015         }
2016
2017         if (cmd & OBD_BRW_WRITE) {
2018                 /* trigger a write rpc stream as long as there are dirtiers
2019                  * waiting for space.  as they're waiting, they're not going to
2020                  * create more pages to coalesce with what's waiting.. */
2021                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2022                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2023                         RETURN(1);
2024                 }
2025         }
2026         if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2027                 RETURN(1);
2028
2029         RETURN(0);
2030 }
2031
2032 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2033 {
2034         struct osc_async_page *oap;
2035         ENTRY;
2036
2037         if (cfs_list_empty(&lop->lop_urgent))
2038                 RETURN(0);
2039
2040         oap = cfs_list_entry(lop->lop_urgent.next,
2041                          struct osc_async_page, oap_urgent_item);
2042
2043         if (oap->oap_async_flags & ASYNC_HP) {
2044                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2045                 RETURN(1);
2046         }
2047
2048         RETURN(0);
2049 }
2050
2051 static void on_list(cfs_list_t *item, cfs_list_t *list,
2052                     int should_be_on)
2053 {
2054         if (cfs_list_empty(item) && should_be_on)
2055                 cfs_list_add_tail(item, list);
2056         else if (!cfs_list_empty(item) && !should_be_on)
2057                 cfs_list_del_init(item);
2058 }
2059
2060 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2061  * can find pages to build into rpcs quickly */
2062 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2063 {
2064         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2065             lop_makes_hprpc(&loi->loi_read_lop)) {
2066                 /* HP rpc */
2067                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2068                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2069         } else {
2070                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2071                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2072                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2073                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2074         }
2075
2076         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2077                 loi->loi_write_lop.lop_num_pending);
2078
2079         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2080                 loi->loi_read_lop.lop_num_pending);
2081 }
2082
2083 static void lop_update_pending(struct client_obd *cli,
2084                                struct loi_oap_pages *lop, int cmd, int delta)
2085 {
2086         lop->lop_num_pending += delta;
2087         if (cmd & OBD_BRW_WRITE)
2088                 cli->cl_pending_w_pages += delta;
2089         else
2090                 cli->cl_pending_r_pages += delta;
2091 }
2092
2093 /**
2094  * this is called when a sync waiter receives an interruption.  Its job is to
2095  * get the caller woken as soon as possible.  If its page hasn't been put in an
2096  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2097  * desiring interruption which will forcefully complete the rpc once the rpc
2098  * has timed out.
2099  */
2100 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2101 {
2102         struct loi_oap_pages *lop;
2103         struct lov_oinfo *loi;
2104         int rc = -EBUSY;
2105         ENTRY;
2106
2107         LASSERT(!oap->oap_interrupted);
2108         oap->oap_interrupted = 1;
2109
2110         /* ok, it's been put in an rpc. only one oap gets a request reference */
2111         if (oap->oap_request != NULL) {
2112                 ptlrpc_mark_interrupted(oap->oap_request);
2113                 ptlrpcd_wake(oap->oap_request);
2114                 ptlrpc_req_finished(oap->oap_request);
2115                 oap->oap_request = NULL;
2116         }
2117
2118         /*
2119          * page completion may be called only if ->cpo_prep() method was
2120          * executed by osc_io_submit(), that also adds page the to pending list
2121          */
2122         if (!cfs_list_empty(&oap->oap_pending_item)) {
2123                 cfs_list_del_init(&oap->oap_pending_item);
2124                 cfs_list_del_init(&oap->oap_urgent_item);
2125
2126                 loi = oap->oap_loi;
2127                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2128                         &loi->loi_write_lop : &loi->loi_read_lop;
2129                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2130                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2131                 rc = oap->oap_caller_ops->ap_completion(env,
2132                                           oap->oap_caller_data,
2133                                           oap->oap_cmd, NULL, -EINTR);
2134         }
2135
2136         RETURN(rc);
2137 }
2138
2139 /* this is trying to propogate async writeback errors back up to the
2140  * application.  As an async write fails we record the error code for later if
2141  * the app does an fsync.  As long as errors persist we force future rpcs to be
2142  * sync so that the app can get a sync error and break the cycle of queueing
2143  * pages for which writeback will fail. */
2144 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2145                            int rc)
2146 {
2147         if (rc) {
2148                 if (!ar->ar_rc)
2149                         ar->ar_rc = rc;
2150
2151                 ar->ar_force_sync = 1;
2152                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2153                 return;
2154
2155         }
2156
2157         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2158                 ar->ar_force_sync = 0;
2159 }
2160
2161 void osc_oap_to_pending(struct osc_async_page *oap)
2162 {
2163         struct loi_oap_pages *lop;
2164
2165         if (oap->oap_cmd & OBD_BRW_WRITE)
2166                 lop = &oap->oap_loi->loi_write_lop;
2167         else
2168                 lop = &oap->oap_loi->loi_read_lop;
2169
2170         if (oap->oap_async_flags & ASYNC_HP)
2171                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2172         else if (oap->oap_async_flags & ASYNC_URGENT)
2173                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2174         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2175         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2176 }
2177
2178 /* this must be called holding the loi list lock to give coverage to exit_cache,
2179  * async_flag maintenance, and oap_request */
2180 static void osc_ap_completion(const struct lu_env *env,
2181                               struct client_obd *cli, struct obdo *oa,
2182                               struct osc_async_page *oap, int sent, int rc)
2183 {
2184         __u64 xid = 0;
2185
2186         ENTRY;
2187         if (oap->oap_request != NULL) {
2188                 xid = ptlrpc_req_xid(oap->oap_request);
2189                 ptlrpc_req_finished(oap->oap_request);
2190                 oap->oap_request = NULL;
2191         }
2192
2193         cfs_spin_lock(&oap->oap_lock);
2194         oap->oap_async_flags = 0;
2195         cfs_spin_unlock(&oap->oap_lock);
2196         oap->oap_interrupted = 0;
2197
2198         if (oap->oap_cmd & OBD_BRW_WRITE) {
2199                 osc_process_ar(&cli->cl_ar, xid, rc);
2200                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2201         }
2202
2203         if (rc == 0 && oa != NULL) {
2204                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2205                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2206                 if (oa->o_valid & OBD_MD_FLMTIME)
2207                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2208                 if (oa->o_valid & OBD_MD_FLATIME)
2209                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2210                 if (oa->o_valid & OBD_MD_FLCTIME)
2211                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2212         }
2213
2214         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2215                                                 oap->oap_cmd, oa, rc);
2216
2217         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2218          * start, but OSC calls it under lock and thus we can add oap back to
2219          * pending safely */
2220         if (rc)
2221                 /* upper layer wants to leave the page on pending queue */
2222                 osc_oap_to_pending(oap);
2223         else
2224                 osc_exit_cache(cli, oap, sent);
2225         EXIT;
2226 }
2227
2228 static int brw_queue_work(const struct lu_env *env, void *data)
2229 {
2230         struct client_obd *cli = data;
2231
2232         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2233
2234         client_obd_list_lock(&cli->cl_loi_list_lock);
2235         osc_check_rpcs0(env, cli, 1);
2236         client_obd_list_unlock(&cli->cl_loi_list_lock);
2237         RETURN(0);
2238 }
2239
2240 static int brw_interpret(const struct lu_env *env,
2241                          struct ptlrpc_request *req, void *data, int rc)
2242 {
2243         struct osc_brw_async_args *aa = data;
2244         struct client_obd *cli;
2245         int async;
2246         ENTRY;
2247
2248         rc = osc_brw_fini_request(req, rc);
2249         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2250         /* When server return -EINPROGRESS, client should always retry
2251          * regardless of the number of times the bulk was resent already. */
2252         if (osc_recoverable_error(rc)) {
2253                 if (req->rq_import_generation !=
2254                     req->rq_import->imp_generation) {
2255                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2256                                ""LPU64":"LPU64", rc = %d.\n",
2257                                req->rq_import->imp_obd->obd_name,
2258                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2259                 } else if (rc == -EINPROGRESS ||
2260                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2261                         rc = osc_brw_redo_request(req, aa);
2262                 } else {
2263                         CERROR("%s: too many resent retries for object: "
2264                                ""LPU64":"LPU64", rc = %d.\n",
2265                                req->rq_import->imp_obd->obd_name,
2266                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2267                 }
2268
2269                 if (rc == 0)
2270                         RETURN(0);
2271                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2272                         rc = -EIO;
2273         }
2274
2275         if (aa->aa_ocapa) {
2276                 capa_put(aa->aa_ocapa);
2277                 aa->aa_ocapa = NULL;
2278         }
2279
2280         cli = aa->aa_cli;
2281         client_obd_list_lock(&cli->cl_loi_list_lock);
2282
2283         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2284          * is called so we know whether to go to sync BRWs or wait for more
2285          * RPCs to complete */
2286         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2287                 cli->cl_w_in_flight--;
2288         else
2289                 cli->cl_r_in_flight--;
2290
2291         async = cfs_list_empty(&aa->aa_oaps);
2292         if (!async) { /* from osc_send_oap_rpc() */
2293                 struct osc_async_page *oap, *tmp;
2294                 /* the caller may re-use the oap after the completion call so
2295                  * we need to clean it up a little */
2296                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2297                                              oap_rpc_item) {
2298                         cfs_list_del_init(&oap->oap_rpc_item);
2299                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2300                 }
2301                 OBDO_FREE(aa->aa_oa);
2302         } else { /* from async_internal() */
2303                 obd_count i;
2304                 for (i = 0; i < aa->aa_page_count; i++)
2305                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2306         }
2307         osc_wake_cache_waiters(cli);
2308         osc_check_rpcs0(env, cli, 1);
2309         client_obd_list_unlock(&cli->cl_loi_list_lock);
2310
2311         if (!async)
2312                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2313                                   req->rq_bulk->bd_nob_transferred);
2314         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2315         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2316
2317         RETURN(rc);
2318 }
2319
2320 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2321                                             struct client_obd *cli,
2322                                             cfs_list_t *rpc_list,
2323                                             int page_count, int cmd)
2324 {
2325         struct ptlrpc_request *req;
2326         struct brw_page **pga = NULL;
2327         struct osc_brw_async_args *aa;
2328         struct obdo *oa = NULL;
2329         const struct obd_async_page_ops *ops = NULL;
2330         struct osc_async_page *oap;
2331         struct osc_async_page *tmp;
2332         struct cl_req *clerq = NULL;
2333         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2334         struct ldlm_lock *lock = NULL;
2335         struct cl_req_attr crattr;
2336         int i, rc, mpflag = 0;
2337
2338         ENTRY;
2339         LASSERT(!cfs_list_empty(rpc_list));
2340
2341         if (cmd & OBD_BRW_MEMALLOC)
2342                 mpflag = cfs_memory_pressure_get_and_set();
2343
2344         memset(&crattr, 0, sizeof crattr);
2345         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2346         if (pga == NULL)
2347                 GOTO(out, req = ERR_PTR(-ENOMEM));
2348
2349         OBDO_ALLOC(oa);
2350         if (oa == NULL)
2351                 GOTO(out, req = ERR_PTR(-ENOMEM));
2352
2353         i = 0;
2354         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2355                 struct cl_page *page = osc_oap2cl_page(oap);
2356                 if (ops == NULL) {
2357                         ops = oap->oap_caller_ops;
2358
2359                         clerq = cl_req_alloc(env, page, crt,
2360                                              1 /* only 1-object rpcs for
2361                                                 * now */);
2362                         if (IS_ERR(clerq))
2363                                 GOTO(out, req = (void *)clerq);
2364                         lock = oap->oap_ldlm_lock;
2365                 }
2366                 pga[i] = &oap->oap_brw_page;
2367                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2368                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2369                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2370                 i++;
2371                 cl_req_page_add(env, clerq, page);
2372         }
2373
2374         /* always get the data for the obdo for the rpc */
2375         LASSERT(ops != NULL);
2376         crattr.cra_oa = oa;
2377         crattr.cra_capa = NULL;
2378         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2379         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2380         if (lock) {
2381                 oa->o_handle = lock->l_remote_handle;
2382                 oa->o_valid |= OBD_MD_FLHANDLE;
2383         }
2384
2385         rc = cl_req_prep(env, clerq);
2386         if (rc != 0) {
2387                 CERROR("cl_req_prep failed: %d\n", rc);
2388                 GOTO(out, req = ERR_PTR(rc));
2389         }
2390
2391         sort_brw_pages(pga, page_count);
2392         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2393                                   pga, &req, crattr.cra_capa, 1, 0);
2394         if (rc != 0) {
2395                 CERROR("prep_req failed: %d\n", rc);
2396                 GOTO(out, req = ERR_PTR(rc));
2397         }
2398
2399         if (cmd & OBD_BRW_MEMALLOC)
2400                 req->rq_memalloc = 1;
2401
2402         /* Need to update the timestamps after the request is built in case
2403          * we race with setattr (locally or in queue at OST).  If OST gets
2404          * later setattr before earlier BRW (as determined by the request xid),
2405          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2406          * way to do this in a single call.  bug 10150 */
2407         cl_req_attr_set(env, clerq, &crattr,
2408                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2409
2410         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2411
2412         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2413         aa = ptlrpc_req_async_args(req);
2414         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2415         cfs_list_splice(rpc_list, &aa->aa_oaps);
2416         CFS_INIT_LIST_HEAD(rpc_list);
2417         aa->aa_clerq = clerq;
2418 out:
2419         if (cmd & OBD_BRW_MEMALLOC)
2420                 cfs_memory_pressure_restore(mpflag);
2421
2422         capa_put(crattr.cra_capa);
2423         if (IS_ERR(req)) {
2424                 if (oa)
2425                         OBDO_FREE(oa);
2426                 if (pga)
2427                         OBD_FREE(pga, sizeof(*pga) * page_count);
2428                 /* this should happen rarely and is pretty bad, it makes the
2429                  * pending list not follow the dirty order */
2430                 client_obd_list_lock(&cli->cl_loi_list_lock);
2431                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2432                         cfs_list_del_init(&oap->oap_rpc_item);
2433
2434                         /* queued sync pages can be torn down while the pages
2435                          * were between the pending list and the rpc */
2436                         if (oap->oap_interrupted) {
2437                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2438                                 osc_ap_completion(env, cli, NULL, oap, 0,
2439                                                   oap->oap_count);
2440                                 continue;
2441                         }
2442                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2443                 }
2444                 if (clerq && !IS_ERR(clerq))
2445                         cl_req_completion(env, clerq, PTR_ERR(req));
2446         }
2447         RETURN(req);
2448 }
2449
2450 /**
2451  * prepare pages for ASYNC io and put pages in send queue.
2452  *
2453  * \param cmd OBD_BRW_* macroses
2454  * \param lop pending pages
2455  *
2456  * \return zero if no page added to send queue.
2457  * \return 1 if pages successfully added to send queue.
2458  * \return negative on errors.
2459  */
2460 static int
2461 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2462                  struct lov_oinfo *loi, int cmd,
2463                  struct loi_oap_pages *lop, pdl_policy_t pol)
2464 {
2465         struct ptlrpc_request *req;
2466         obd_count page_count = 0;
2467         struct osc_async_page *oap = NULL, *tmp;
2468         struct osc_brw_async_args *aa;
2469         const struct obd_async_page_ops *ops;
2470         CFS_LIST_HEAD(rpc_list);
2471         int srvlock = 0, mem_tight = 0;
2472         struct cl_object *clob = NULL;
2473         obd_off starting_offset = OBD_OBJECT_EOF;
2474         unsigned int ending_offset;
2475         int starting_page_off = 0;
2476         ENTRY;
2477
2478         /* ASYNC_HP pages first. At present, when the lock the pages is
2479          * to be canceled, the pages covered by the lock will be sent out
2480          * with ASYNC_HP. We have to send out them as soon as possible. */
2481         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2482                 if (oap->oap_async_flags & ASYNC_HP)
2483                         cfs_list_move(&oap->oap_pending_item, &rpc_list);
2484                 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2485                         /* only do this for writeback pages. */
2486                         cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2487                 if (++page_count >= cli->cl_max_pages_per_rpc)
2488                         break;
2489         }
2490         cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2491         page_count = 0;
2492
2493         /* first we find the pages we're allowed to work with */
2494         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2495                                      oap_pending_item) {
2496                 ops = oap->oap_caller_ops;
2497
2498                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2499                          "magic 0x%x\n", oap, oap->oap_magic);
2500
2501                 if (clob == NULL) {
2502                         /* pin object in memory, so that completion call-backs
2503                          * can be safely called under client_obd_list lock. */
2504                         clob = osc_oap2cl_page(oap)->cp_obj;
2505                         cl_object_get(clob);
2506                 }
2507
2508                 if (page_count != 0 &&
2509                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2510                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2511                                " oap %p, page %p, srvlock %u\n",
2512                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2513                         break;
2514                 }
2515
2516                 /* If there is a gap at the start of this page, it can't merge
2517                  * with any previous page, so we'll hand the network a
2518                  * "fragmented" page array that it can't transfer in 1 RDMA */
2519                 if (oap->oap_obj_off < starting_offset) {
2520                         if (starting_page_off != 0)
2521                                 break;
2522
2523                         starting_page_off = oap->oap_page_off;
2524                         starting_offset = oap->oap_obj_off + starting_page_off;
2525                 } else if (oap->oap_page_off != 0)
2526                         break;
2527
2528                 /* in llite being 'ready' equates to the page being locked
2529                  * until completion unlocks it.  commit_write submits a page
2530                  * as not ready because its unlock will happen unconditionally
2531                  * as the call returns.  if we race with commit_write giving
2532                  * us that page we don't want to create a hole in the page
2533                  * stream, so we stop and leave the rpc to be fired by
2534                  * another dirtier or kupdated interval (the not ready page
2535                  * will still be on the dirty list).  we could call in
2536                  * at the end of ll_file_write to process the queue again. */
2537                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2538                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2539                                                     cmd);
2540                         if (rc < 0)
2541                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2542                                                 "instead of ready\n", oap,
2543                                                 oap->oap_page, rc);
2544                         switch (rc) {
2545                         case -EAGAIN:
2546                                 /* llite is telling us that the page is still
2547                                  * in commit_write and that we should try
2548                                  * and put it in an rpc again later.  we
2549                                  * break out of the loop so we don't create
2550                                  * a hole in the sequence of pages in the rpc
2551                                  * stream.*/
2552                                 oap = NULL;
2553                                 break;
2554                         case -EINTR:
2555                                 /* the io isn't needed.. tell the checks
2556                                  * below to complete the rpc with EINTR */
2557                                 cfs_spin_lock(&oap->oap_lock);
2558                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2559                                 cfs_spin_unlock(&oap->oap_lock);
2560                                 oap->oap_count = -EINTR;
2561                                 break;
2562                         case 0:
2563                                 cfs_spin_lock(&oap->oap_lock);
2564                                 oap->oap_async_flags |= ASYNC_READY;
2565                                 cfs_spin_unlock(&oap->oap_lock);
2566                                 break;
2567                         default:
2568                                 LASSERTF(0, "oap %p page %p returned %d "
2569                                             "from make_ready\n", oap,
2570                                             oap->oap_page, rc);
2571                                 break;
2572                         }
2573                 }
2574                 if (oap == NULL)
2575                         break;
2576
2577                 /* take the page out of our book-keeping */
2578                 cfs_list_del_init(&oap->oap_pending_item);
2579                 lop_update_pending(cli, lop, cmd, -1);
2580                 cfs_list_del_init(&oap->oap_urgent_item);
2581
2582                 /* ask the caller for the size of the io as the rpc leaves. */
2583                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2584                         oap->oap_count =
2585                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2586                                                       cmd);
2587                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2588                 }
2589                 if (oap->oap_count <= 0) {
2590                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2591                                oap->oap_count);
2592                         osc_ap_completion(env, cli, NULL,
2593                                           oap, 0, oap->oap_count);
2594                         continue;
2595                 }
2596
2597                 /* now put the page back in our accounting */
2598                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2599                 if (page_count++ == 0)
2600                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2601
2602                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2603                         mem_tight = 1;
2604
2605                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2606                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2607                  * have the same alignment as the initial writes that allocated
2608                  * extents on the server. */
2609                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2610                                 oap->oap_count;
2611                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2612                         break;
2613
2614                 if (page_count >= cli->cl_max_pages_per_rpc)
2615                         break;
2616
2617                 /* If there is a gap at the end of this page, it can't merge
2618                  * with any subsequent pages, so we'll hand the network a
2619                  * "fragmented" page array that it can't transfer in 1 RDMA */
2620                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2621                         break;
2622         }
2623
2624         loi_list_maint(cli, loi);
2625
2626         client_obd_list_unlock(&cli->cl_loi_list_lock);
2627
2628         if (clob != NULL)
2629                 cl_object_put(env, clob);
2630
2631         if (page_count == 0) {
2632                 client_obd_list_lock(&cli->cl_loi_list_lock);
2633                 RETURN(0);
2634         }
2635
2636         req = osc_build_req(env, cli, &rpc_list, page_count,
2637                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2638         if (IS_ERR(req)) {
2639                 LASSERT(cfs_list_empty(&rpc_list));
2640                 loi_list_maint(cli, loi);
2641                 RETURN(PTR_ERR(req));
2642         }
2643
2644         aa = ptlrpc_req_async_args(req);
2645
2646         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2647         if (cmd == OBD_BRW_READ) {
2648                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2649                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2650                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2651                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2652         } else {
2653                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2654                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2655                                  cli->cl_w_in_flight);
2656                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2657                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2658         }
2659
2660         client_obd_list_lock(&cli->cl_loi_list_lock);
2661
2662         if (cmd == OBD_BRW_READ)
2663                 cli->cl_r_in_flight++;
2664         else
2665                 cli->cl_w_in_flight++;
2666
2667         /* queued sync pages can be torn down while the pages
2668          * were between the pending list and the rpc */
2669         tmp = NULL;
2670         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2671                 /* only one oap gets a request reference */
2672                 if (tmp == NULL)
2673                         tmp = oap;
2674                 if (oap->oap_interrupted && !req->rq_intr) {
2675                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2676                                oap, req);
2677                         ptlrpc_mark_interrupted(req);
2678                 }
2679         }
2680         if (tmp != NULL)
2681                 tmp->oap_request = ptlrpc_request_addref(req);
2682
2683         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2684                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2685
2686         req->rq_interpret_reply = brw_interpret;
2687
2688         /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2689          *      CPU/NUMA node the majority of pages were allocated on, and try
2690          *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2691          *      to reduce cross-CPU memory traffic.
2692          *
2693          *      But on the other hand, we expect that multiple ptlrpcd threads
2694          *      and the initial write sponsor can run in parallel, especially
2695          *      when data checksum is enabled, which is CPU-bound operation and
2696          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
2697          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2698          */
2699         ptlrpcd_add_req(req, pol, -1);
2700         RETURN(1);
2701 }
2702
2703 #define LOI_DEBUG(LOI, STR, args...)                                     \
2704         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2705                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2706                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2707                (LOI)->loi_write_lop.lop_num_pending,                     \
2708                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2709                (LOI)->loi_read_lop.lop_num_pending,                      \
2710                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2711                args)                                                     \
2712
2713 /* This is called by osc_check_rpcs() to find which objects have pages that
2714  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2715 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2716 {
2717         ENTRY;
2718
2719         /* First return objects that have blocked locks so that they
2720          * will be flushed quickly and other clients can get the lock,
2721          * then objects which have pages ready to be stuffed into RPCs */
2722         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2723                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2724                                       struct lov_oinfo, loi_hp_ready_item));
2725         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2726                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2727                                       struct lov_oinfo, loi_ready_item));
2728
2729         /* then if we have cache waiters, return all objects with queued
2730          * writes.  This is especially important when many small files
2731          * have filled up the cache and not been fired into rpcs because
2732          * they don't pass the nr_pending/object threshhold */
2733         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2734             !cfs_list_empty(&cli->cl_loi_write_list))
2735                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2736                                       struct lov_oinfo, loi_write_item));
2737
2738         /* then return all queued objects when we have an invalid import
2739          * so that they get flushed */
2740         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2741                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2742                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2743                                               struct lov_oinfo,
2744                                               loi_write_item));
2745                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2746                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2747                                               struct lov_oinfo, loi_read_item));
2748         }
2749         RETURN(NULL);
2750 }
2751
2752 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2753 {
2754         struct osc_async_page *oap;
2755         int hprpc = 0;
2756
2757         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2758                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2759                                      struct osc_async_page, oap_urgent_item);
2760                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2761         }
2762
2763         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2764                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2765                                      struct osc_async_page, oap_urgent_item);
2766                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2767         }
2768
2769         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2770 }
2771
2772 /* called with the loi list lock held */
2773 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2774 {
2775         struct lov_oinfo *loi;
2776         int rc = 0, race_counter = 0;
2777         pdl_policy_t pol;
2778         ENTRY;
2779
2780         pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2781
2782         while ((loi = osc_next_loi(cli)) != NULL) {
2783                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2784
2785                 if (osc_max_rpc_in_flight(cli, loi))
2786                         break;
2787
2788                 /* attempt some read/write balancing by alternating between
2789                  * reads and writes in an object.  The makes_rpc checks here
2790                  * would be redundant if we were getting read/write work items
2791                  * instead of objects.  we don't want send_oap_rpc to drain a
2792                  * partial read pending queue when we're given this object to
2793                  * do io on writes while there are cache waiters */
2794                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2795                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2796                                               &loi->loi_write_lop, pol);
2797                         if (rc < 0) {
2798                                 CERROR("Write request failed with %d\n", rc);
2799
2800                                 /* osc_send_oap_rpc failed, mostly because of
2801                                  * memory pressure.
2802                                  *
2803                                  * It can't break here, because if:
2804                                  *  - a page was submitted by osc_io_submit, so
2805                                  *    page locked;
2806                                  *  - no request in flight
2807                                  *  - no subsequent request
2808                                  * The system will be in live-lock state,
2809                                  * because there is no chance to call
2810                                  * osc_io_unplug() and osc_check_rpcs() any
2811                                  * more. pdflush can't help in this case,
2812                                  * because it might be blocked at grabbing
2813                                  * the page lock as we mentioned.
2814                                  *
2815                                  * Anyway, continue to drain pages. */
2816                                 /* break; */
2817                         }
2818
2819                         if (rc > 0)
2820                                 race_counter = 0;
2821                         else if (rc == 0)
2822                                 race_counter++;
2823                 }
2824                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2825                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2826                                               &loi->loi_read_lop, pol);
2827                         if (rc < 0)
2828                                 CERROR("Read request failed with %d\n", rc);
2829
2830                         if (rc > 0)
2831                                 race_counter = 0;
2832                         else if (rc == 0)
2833                                 race_counter++;
2834                 }
2835
2836                 /* attempt some inter-object balancing by issuing rpcs
2837                  * for each object in turn */
2838                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2839                         cfs_list_del_init(&loi->loi_hp_ready_item);
2840                 if (!cfs_list_empty(&loi->loi_ready_item))
2841                         cfs_list_del_init(&loi->loi_ready_item);
2842                 if (!cfs_list_empty(&loi->loi_write_item))
2843                         cfs_list_del_init(&loi->loi_write_item);
2844                 if (!cfs_list_empty(&loi->loi_read_item))
2845                         cfs_list_del_init(&loi->loi_read_item);
2846
2847                 loi_list_maint(cli, loi);
2848
2849                 /* send_oap_rpc fails with 0 when make_ready tells it to
2850                  * back off.  llite's make_ready does this when it tries
2851                  * to lock a page queued for write that is already locked.
2852                  * we want to try sending rpcs from many objects, but we
2853                  * don't want to spin failing with 0.  */
2854                 if (race_counter == 10)
2855                         break;
2856         }
2857 }
2858
2859 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2860 {
2861         osc_check_rpcs0(env, cli, 0);
2862 }
2863
2864 /**
2865  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2866  * is available.
2867  */
2868 int osc_enter_cache_try(const struct lu_env *env,
2869                         struct client_obd *cli, struct lov_oinfo *loi,
2870                         struct osc_async_page *oap, int transient)
2871 {
2872         int has_grant;
2873
2874         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2875         if (has_grant) {
2876                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2877                 if (transient) {
2878                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2879                         cfs_atomic_inc(&obd_dirty_transit_pages);
2880                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2881                 }
2882         }
2883         return has_grant;
2884 }
2885
2886 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2887  * grant or cache space. */
2888 static int osc_enter_cache(const struct lu_env *env,
2889                            struct client_obd *cli, struct lov_oinfo *loi,
2890                            struct osc_async_page *oap)
2891 {
2892         struct osc_cache_waiter ocw;
2893         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2894         int rc = -EDQUOT;
2895         ENTRY;
2896
2897         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2898                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2899                cli->cl_dirty_max, obd_max_dirty_pages,
2900                cli->cl_lost_grant, cli->cl_avail_grant);
2901
2902         /* force the caller to try sync io.  this can jump the list
2903          * of queued writes and create a discontiguous rpc stream */
2904         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2905             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2906             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2907                 RETURN(-EDQUOT);
2908
2909         /* Hopefully normal case - cache space and write credits available */
2910         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2911             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2912             osc_enter_cache_try(env, cli, loi, oap, 0))
2913                 RETURN(0);
2914
2915         /* We can get here for two reasons: too many dirty pages in cache, or
2916          * run out of grants. In both cases we should write dirty pages out.
2917          * Adding a cache waiter will trigger urgent write-out no matter what
2918          * RPC size will be.
2919          * The exiting condition is no avail grants and no dirty pages caching,
2920          * that really means there is no space on the OST. */
2921         cfs_waitq_init(&ocw.ocw_waitq);
2922         ocw.ocw_oap = oap;
2923         while (cli->cl_dirty > 0) {
2924                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2925                 ocw.ocw_rc = 0;
2926
2927                 loi_list_maint(cli, loi);
2928                 osc_check_rpcs(env, cli);
2929                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2930
2931                 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
2932                        cli->cl_import->imp_obd->obd_name, &ocw, oap);
2933
2934                 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
2935
2936                 client_obd_list_lock(&cli->cl_loi_list_lock);
2937                 cfs_list_del_init(&ocw.ocw_entry);
2938                 if (rc < 0)
2939                         break;
2940
2941                 rc = ocw.ocw_rc;
2942                 if (rc != -EDQUOT)
2943                         break;
2944         }
2945
2946         RETURN(rc);
2947 }
2948
2949
2950 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2951                         struct lov_oinfo *loi, cfs_page_t *page,
2952                         obd_off offset, const struct obd_async_page_ops *ops,
2953                         void *data, void **res, int nocache,
2954                         struct lustre_handle *lockh)
2955 {
2956         struct osc_async_page *oap;
2957
2958         ENTRY;
2959
2960         if (!page)
2961                 return cfs_size_round(sizeof(*oap));
2962
2963         oap = *res;
2964         oap->oap_magic = OAP_MAGIC;
2965         oap->oap_cli = &exp->exp_obd->u.cli;
2966         oap->oap_loi = loi;
2967
2968         oap->oap_caller_ops = ops;
2969         oap->oap_caller_data = data;
2970
2971         oap->oap_page = page;
2972         oap->oap_obj_off = offset;
2973         if (!client_is_remote(exp) &&
2974             cfs_capable(CFS_CAP_SYS_RESOURCE))
2975                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2976
2977         LASSERT(!(offset & ~CFS_PAGE_MASK));
2978
2979         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2980         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2981         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2982         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2983
2984         cfs_spin_lock_init(&oap->oap_lock);
2985         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2986         RETURN(0);
2987 }
2988
2989 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2990                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2991                        struct osc_async_page *oap, int cmd, int off,
2992                        int count, obd_flag brw_flags, enum async_flags async_flags)
2993 {
2994         struct client_obd *cli = &exp->exp_obd->u.cli;
2995         int rc = 0;
2996         ENTRY;
2997
2998         if (oap->oap_magic != OAP_MAGIC)
2999                 RETURN(-EINVAL);
3000
3001         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3002                 RETURN(-EIO);
3003
3004         if (!cfs_list_empty(&oap->oap_pending_item) ||
3005             !cfs_list_empty(&oap->oap_urgent_item) ||
3006             !cfs_list_empty(&oap->oap_rpc_item))
3007                 RETURN(-EBUSY);
3008
3009         /* check if the file's owner/group is over quota */
3010         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3011                 struct cl_object *obj;
3012                 struct cl_attr    attr; /* XXX put attr into thread info */
3013                 unsigned int qid[MAXQUOTAS];
3014
3015                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3016
3017                 cl_object_attr_lock(obj);
3018                 rc = cl_object_attr_get(env, obj, &attr);
3019                 cl_object_attr_unlock(obj);
3020
3021                 qid[USRQUOTA] = attr.cat_uid;
3022                 qid[GRPQUOTA] = attr.cat_gid;
3023                 if (rc == 0 &&
3024                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
3025                         rc = -EDQUOT;
3026                 if (rc)
3027                         RETURN(rc);
3028         }
3029
3030         if (loi == NULL)
3031                 loi = lsm->lsm_oinfo[0];
3032
3033         client_obd_list_lock(&cli->cl_loi_list_lock);
3034
3035         LASSERT(off + count <= CFS_PAGE_SIZE);
3036         oap->oap_cmd = cmd;
3037         oap->oap_page_off = off;
3038         oap->oap_count = count;
3039         oap->oap_brw_flags = brw_flags;
3040         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3041         if (cfs_memory_pressure_get())
3042                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3043         cfs_spin_lock(&oap->oap_lock);
3044         oap->oap_async_flags = async_flags;
3045         cfs_spin_unlock(&oap->oap_lock);
3046
3047         if (cmd & OBD_BRW_WRITE) {
3048                 rc = osc_enter_cache(env, cli, loi, oap);
3049                 if (rc) {
3050                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3051                         RETURN(rc);
3052                 }
3053         }
3054
3055         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3056                   cmd);
3057
3058         osc_oap_to_pending(oap);
3059         loi_list_maint(cli, loi);
3060         if (!osc_max_rpc_in_flight(cli, loi) &&
3061             lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3062                 LASSERT(cli->cl_writeback_work != NULL);
3063                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3064
3065                 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3066                        cli, rc);
3067         }
3068         client_obd_list_unlock(&cli->cl_loi_list_lock);
3069
3070         RETURN(0);
3071 }
3072
3073 /* aka (~was & now & flag), but this is more clear :) */
3074 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3075
3076 int osc_set_async_flags_base(struct client_obd *cli,
3077                              struct lov_oinfo *loi, struct osc_async_page *oap,
3078                              obd_flag async_flags)
3079 {
3080         struct loi_oap_pages *lop;
3081         int flags = 0;
3082         ENTRY;
3083
3084         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3085
3086         if (oap->oap_cmd & OBD_BRW_WRITE) {
3087                 lop = &loi->loi_write_lop;
3088         } else {
3089                 lop = &loi->loi_read_lop;
3090         }
3091
3092         if ((oap->oap_async_flags & async_flags) == async_flags)
3093                 RETURN(0);
3094
3095         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3096                 flags |= ASYNC_READY;
3097
3098         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3099             cfs_list_empty(&oap->oap_rpc_item)) {
3100                 if (oap->oap_async_flags & ASYNC_HP)
3101                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3102                 else
3103                         cfs_list_add_tail(&oap->oap_urgent_item,
3104                                           &lop->lop_urgent);
3105                 flags |= ASYNC_URGENT;
3106                 loi_list_maint(cli, loi);
3107         }
3108         cfs_spin_lock(&oap->oap_lock);
3109         oap->oap_async_flags |= flags;
3110         cfs_spin_unlock(&oap->oap_lock);
3111
3112         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3113                         oap->oap_async_flags);
3114         RETURN(0);
3115 }
3116
3117 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3118                             struct lov_oinfo *loi, struct osc_async_page *oap)
3119 {
3120         struct client_obd *cli = &exp->exp_obd->u.cli;
3121         struct loi_oap_pages *lop;
3122         int rc = 0;
3123         ENTRY;
3124
3125         if (oap->oap_magic != OAP_MAGIC)
3126                 RETURN(-EINVAL);
3127
3128         if (loi == NULL)
3129                 loi = lsm->lsm_oinfo[0];
3130
3131         if (oap->oap_cmd & OBD_BRW_WRITE) {
3132                 lop = &loi->loi_write_lop;
3133         } else {
3134                 lop = &loi->loi_read_lop;
3135         }
3136
3137         client_obd_list_lock(&cli->cl_loi_list_lock);
3138
3139         if (!cfs_list_empty(&oap->oap_rpc_item))
3140                 GOTO(out, rc = -EBUSY);
3141
3142         osc_exit_cache(cli, oap, 0);
3143         osc_wake_cache_waiters(cli);
3144
3145         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3146                 cfs_list_del_init(&oap->oap_urgent_item);
3147                 cfs_spin_lock(&oap->oap_lock);
3148                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3149                 cfs_spin_unlock(&oap->oap_lock);
3150         }
3151         if (!cfs_list_empty(&oap->oap_pending_item)) {
3152                 cfs_list_del_init(&oap->oap_pending_item);
3153                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3154         }
3155         loi_list_maint(cli, loi);
3156         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3157 out:
3158         client_obd_list_unlock(&cli->cl_loi_list_lock);
3159         RETURN(rc);
3160 }
3161
3162 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3163                                         struct ldlm_enqueue_info *einfo)
3164 {
3165         void *data = einfo->ei_cbdata;
3166         int set = 0;
3167
3168         LASSERT(lock != NULL);
3169         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3170         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3171         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3172         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3173
3174         lock_res_and_lock(lock);
3175         cfs_spin_lock(&osc_ast_guard);
3176
3177         if (lock->l_ast_data == NULL)
3178                 lock->l_ast_data = data;
3179         if (lock->l_ast_data == data)
3180                 set = 1;
3181
3182         cfs_spin_unlock(&osc_ast_guard);
3183         unlock_res_and_lock(lock);
3184
3185         return set;
3186 }
3187
3188 static int osc_set_data_with_check(struct lustre_handle *lockh,
3189                                    struct ldlm_enqueue_info *einfo)
3190 {
3191         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3192         int set = 0;
3193
3194         if (lock != NULL) {
3195                 set = osc_set_lock_data_with_check(lock, einfo);
3196                 LDLM_LOCK_PUT(lock);
3197         } else
3198                 CERROR("lockh %p, data %p - client evicted?\n",
3199                        lockh, einfo->ei_cbdata);
3200         return set;
3201 }
3202
3203 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3204                              ldlm_iterator_t replace, void *data)
3205 {
3206         struct ldlm_res_id res_id;
3207         struct obd_device *obd = class_exp2obd(exp);
3208
3209         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3210         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3211         return 0;
3212 }
3213
3214 /* find any ldlm lock of the inode in osc
3215  * return 0    not find
3216  *        1    find one
3217  *      < 0    error */
3218 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3219                            ldlm_iterator_t replace, void *data)
3220 {
3221         struct ldlm_res_id res_id;
3222         struct obd_device *obd = class_exp2obd(exp);
3223         int rc = 0;
3224
3225         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3226         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3227         if (rc == LDLM_ITER_STOP)
3228                 return(1);
3229         if (rc == LDLM_ITER_CONTINUE)
3230                 return(0);
3231         return(rc);
3232 }
3233
3234 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3235                             obd_enqueue_update_f upcall, void *cookie,
3236                             int *flags, int agl, int rc)
3237 {
3238         int intent = *flags & LDLM_FL_HAS_INTENT;
3239         ENTRY;
3240
3241         if (intent) {
3242                 /* The request was created before ldlm_cli_enqueue call. */
3243                 if (rc == ELDLM_LOCK_ABORTED) {
3244                         struct ldlm_reply *rep;
3245                         rep = req_capsule_server_get(&req->rq_pill,
3246                                                      &RMF_DLM_REP);
3247
3248                         LASSERT(rep != NULL);
3249                         if (rep->lock_policy_res1)
3250                                 rc = rep->lock_policy_res1;
3251                 }
3252         }
3253
3254         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3255             (rc == 0)) {
3256                 *flags |= LDLM_FL_LVB_READY;
3257                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3258                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3259         }
3260
3261         /* Call the update callback. */
3262         rc = (*upcall)(cookie, rc);
3263         RETURN(rc);
3264 }
3265
3266 static int osc_enqueue_interpret(const struct lu_env *env,
3267                                  struct ptlrpc_request *req,
3268                                  struct osc_enqueue_args *aa, int rc)
3269 {
3270         struct ldlm_lock *lock;
3271         struct lustre_handle handle;
3272         __u32 mode;
3273         struct ost_lvb *lvb;
3274         __u32 lvb_len;
3275         int *flags = aa->oa_flags;
3276
3277         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3278          * might be freed anytime after lock upcall has been called. */
3279         lustre_handle_copy(&handle, aa->oa_lockh);
3280         mode = aa->oa_ei->ei_mode;
3281
3282         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3283          * be valid. */
3284         lock = ldlm_handle2lock(&handle);
3285
3286         /* Take an additional reference so that a blocking AST that
3287          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3288          * to arrive after an upcall has been executed by
3289          * osc_enqueue_fini(). */
3290         ldlm_lock_addref(&handle, mode);
3291
3292         /* Let CP AST to grant the lock first. */
3293         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3294
3295         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3296                 lvb = NULL;
3297                 lvb_len = 0;
3298         } else {
3299                 lvb = aa->oa_lvb;
3300                 lvb_len = sizeof(*aa->oa_lvb);
3301         }
3302
3303         /* Complete obtaining the lock procedure. */
3304         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3305                                    mode, flags, lvb, lvb_len, &handle, rc);
3306         /* Complete osc stuff. */
3307         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3308                               flags, aa->oa_agl, rc);
3309
3310         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3311
3312         /* Release the lock for async request. */
3313         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3314                 /*
3315                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3316                  * not already released by
3317                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3318                  */
3319                 ldlm_lock_decref(&handle, mode);
3320
3321         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3322                  aa->oa_lockh, req, aa);
3323         ldlm_lock_decref(&handle, mode);
3324         LDLM_LOCK_PUT(lock);
3325         return rc;
3326 }
3327
3328 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3329                         struct lov_oinfo *loi, int flags,
3330                         struct ost_lvb *lvb, __u32 mode, int rc)
3331 {
3332         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3333
3334         if (rc == ELDLM_OK) {
3335                 __u64 tmp;
3336
3337                 LASSERT(lock != NULL);
3338                 loi->loi_lvb = *lvb;
3339                 tmp = loi->loi_lvb.lvb_size;
3340                 /* Extend KMS up to the end of this lock and no further
3341                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3342                 if (tmp > lock->l_policy_data.l_extent.end)
3343                         tmp = lock->l_policy_data.l_extent.end + 1;
3344                 if (tmp >= loi->loi_kms) {
3345                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3346                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3347                         loi_kms_set(loi, tmp);
3348                 } else {
3349                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3350                                    LPU64"; leaving kms="LPU64", end="LPU64,
3351                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3352                                    lock->l_policy_data.l_extent.end);
3353                 }
3354                 ldlm_lock_allow_match(lock);
3355         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3356                 LASSERT(lock != NULL);
3357                 loi->loi_lvb = *lvb;
3358                 ldlm_lock_allow_match(lock);
3359                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3360                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3361                 rc = ELDLM_OK;
3362         }
3363
3364         if (lock != NULL) {
3365                 if (rc != ELDLM_OK)
3366                         ldlm_lock_fail_match(lock);
3367
3368                 LDLM_LOCK_PUT(lock);
3369         }
3370 }
3371 EXPORT_SYMBOL(osc_update_enqueue);
3372
3373 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3374
3375 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3376  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3377  * other synchronous requests, however keeping some locks and trying to obtain
3378  * others may take a considerable amount of time in a case of ost failure; and
3379  * when other sync requests do not get released lock from a client, the client
3380  * is excluded from the cluster -- such scenarious make the life difficult, so
3381  * release locks just after they are obtained. */
3382 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3383                      int *flags, ldlm_policy_data_t *policy,
3384                      struct ost_lvb *lvb, int kms_valid,
3385                      obd_enqueue_update_f upcall, void *cookie,
3386                      struct ldlm_enqueue_info *einfo,
3387                      struct lustre_handle *lockh,
3388                      struct ptlrpc_request_set *rqset, int async, int agl)
3389 {
3390         struct obd_device *obd = exp->exp_obd;
3391         struct ptlrpc_request *req = NULL;
3392         int intent = *flags & LDLM_FL_HAS_INTENT;
3393         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3394         ldlm_mode_t mode;
3395         int rc;
3396         ENTRY;
3397
3398         /* Filesystem lock extents are extended to page boundaries so that
3399          * dealing with the page cache is a little smoother.  */
3400         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3401         policy->l_extent.end |= ~CFS_PAGE_MASK;
3402
3403         /*
3404          * kms is not valid when either object is completely fresh (so that no
3405          * locks are cached), or object was evicted. In the latter case cached
3406          * lock cannot be used, because it would prime inode state with
3407          * potentially stale LVB.
3408          */
3409         if (!kms_valid)
3410                 goto no_match;
3411
3412         /* Next, search for already existing extent locks that will cover us */
3413         /* If we're trying to read, we also search for an existing PW lock.  The
3414          * VFS and page cache already protect us locally, so lots of readers/
3415          * writers can share a single PW lock.
3416          *
3417          * There are problems with conversion deadlocks, so instead of
3418          * converting a read lock to a write lock, we'll just enqueue a new
3419          * one.
3420          *
3421          * At some point we should cancel the read lock instead of making them
3422          * send us a blocking callback, but there are problems with canceling
3423          * locks out from other users right now, too. */
3424         mode = einfo->ei_mode;
3425         if (einfo->ei_mode == LCK_PR)
3426                 mode |= LCK_PW;
3427         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3428                                einfo->ei_type, policy, mode, lockh, 0);
3429         if (mode) {
3430                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3431
3432                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3433                         /* For AGL, if enqueue RPC is sent but the lock is not
3434                          * granted, then skip to process this strpe.
3435                          * Return -ECANCELED to tell the caller. */
3436                         ldlm_lock_decref(lockh, mode);
3437                         LDLM_LOCK_PUT(matched);
3438                         RETURN(-ECANCELED);
3439                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3440                         *flags |= LDLM_FL_LVB_READY;
3441                         /* addref the lock only if not async requests and PW
3442                          * lock is matched whereas we asked for PR. */
3443                         if (!rqset && einfo->ei_mode != mode)
3444                                 ldlm_lock_addref(lockh, LCK_PR);
3445                         if (intent) {
3446                                 /* I would like to be able to ASSERT here that
3447                                  * rss <= kms, but I can't, for reasons which
3448                                  * are explained in lov_enqueue() */
3449                         }
3450
3451                         /* We already have a lock, and it's referenced */
3452                         (*upcall)(cookie, ELDLM_OK);
3453
3454                         if (einfo->ei_mode != mode)
3455                                 ldlm_lock_decref(lockh, LCK_PW);
3456                         else if (rqset)
3457                                 /* For async requests, decref the lock. */
3458                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3459                         LDLM_LOCK_PUT(matched);
3460                         RETURN(ELDLM_OK);
3461                 } else {
3462                         ldlm_lock_decref(lockh, mode);
3463                         LDLM_LOCK_PUT(matched);
3464                 }
3465         }
3466
3467  no_match:
3468         if (intent) {
3469                 CFS_LIST_HEAD(cancels);
3470                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3471                                            &RQF_LDLM_ENQUEUE_LVB);
3472                 if (req == NULL)
3473                         RETURN(-ENOMEM);
3474
3475                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3476                 if (rc) {
3477                         ptlrpc_request_free(req);
3478                         RETURN(rc);
3479                 }
3480
3481                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3482                                      sizeof *lvb);
3483                 ptlrpc_request_set_replen(req);
3484         }
3485
3486         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3487         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3488
3489         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3490                               sizeof(*lvb), lockh, async);
3491         if (rqset) {
3492                 if (!rc) {
3493                         struct osc_enqueue_args *aa;
3494                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3495                         aa = ptlrpc_req_async_args(req);
3496                         aa->oa_ei = einfo;
3497                         aa->oa_exp = exp;
3498                         aa->oa_flags  = flags;
3499                         aa->oa_upcall = upcall;
3500                         aa->oa_cookie = cookie;
3501                         aa->oa_lvb    = lvb;
3502                         aa->oa_lockh  = lockh;
3503                         aa->oa_agl    = !!agl;
3504
3505                         req->rq_interpret_reply =
3506                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3507                         if (rqset == PTLRPCD_SET)
3508                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3509                         else
3510                                 ptlrpc_set_add_req(rqset, req);
3511                 } else if (intent) {
3512                         ptlrpc_req_finished(req);
3513                 }
3514                 RETURN(rc);
3515         }
3516
3517         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3518         if (intent)
3519                 ptlrpc_req_finished(req);
3520
3521         RETURN(rc);
3522 }
3523
3524 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3525                        struct ldlm_enqueue_info *einfo,
3526                        struct ptlrpc_request_set *rqset)
3527 {
3528         struct ldlm_res_id res_id;
3529         int rc;
3530         ENTRY;
3531
3532         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3533                            oinfo->oi_md->lsm_object_seq, &res_id);
3534
3535         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3536                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3537                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3538                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3539                               rqset, rqset != NULL, 0);
3540         RETURN(rc);
3541 }
3542
3543 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3544                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3545                    int *flags, void *data, struct lustre_handle *lockh,
3546                    int unref)
3547 {
3548         struct obd_device *obd = exp->exp_obd;
3549         int lflags = *flags;
3550         ldlm_mode_t rc;
3551         ENTRY;
3552
3553         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3554                 RETURN(-EIO);
3555
3556         /* Filesystem lock extents are extended to page boundaries so that
3557          * dealing with the page cache is a little smoother */
3558         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3559         policy->l_extent.end |= ~CFS_PAGE_MASK;
3560
3561         /* Next, search for already existing extent locks that will cover us */
3562         /* If we're trying to read, we also search for an existing PW lock.  The
3563          * VFS and page cache already protect us locally, so lots of readers/
3564          * writers can share a single PW lock. */
3565         rc = mode;
3566         if (mode == LCK_PR)
3567                 rc |= LCK_PW;
3568         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3569                              res_id, type, policy, rc, lockh, unref);
3570         if (rc) {
3571                 if (data != NULL) {
3572                         if (!osc_set_data_with_check(lockh, data)) {
3573                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3574                                         ldlm_lock_decref(lockh, rc);
3575                                 RETURN(0);
3576                         }
3577                 }
3578                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3579                         ldlm_lock_addref(lockh, LCK_PR);
3580                         ldlm_lock_decref(lockh, LCK_PW);
3581                 }
3582                 RETURN(rc);
3583         }
3584         RETURN(rc);
3585 }
3586
3587 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3588 {
3589         ENTRY;
3590
3591         if (unlikely(mode == LCK_GROUP))
3592                 ldlm_lock_decref_and_cancel(lockh, mode);
3593         else
3594                 ldlm_lock_decref(lockh, mode);
3595
3596         RETURN(0);
3597 }
3598
3599 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3600                       __u32 mode, struct lustre_handle *lockh)
3601 {
3602         ENTRY;
3603         RETURN(osc_cancel_base(lockh, mode));
3604 }
3605
3606 static int osc_cancel_unused(struct obd_export *exp,
3607                              struct lov_stripe_md *lsm,
3608                              ldlm_cancel_flags_t flags,
3609                              void *opaque)
3610 {
3611         struct obd_device *obd = class_exp2obd(exp);
3612         struct ldlm_res_id res_id, *resp = NULL;
3613
3614         if (lsm != NULL) {
3615                 resp = osc_build_res_name(lsm->lsm_object_id,
3616                                           lsm->lsm_object_seq, &res_id);
3617         }
3618
3619         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3620 }
3621
3622 static int osc_statfs_interpret(const struct lu_env *env,
3623                                 struct ptlrpc_request *req,
3624                                 struct osc_async_args *aa, int rc)
3625 {
3626         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3627         struct obd_statfs *msfs;
3628         __u64 used;
3629         ENTRY;
3630
3631         if (rc == -EBADR)
3632                 /* The request has in fact never been sent
3633                  * due to issues at a higher level (LOV).
3634                  * Exit immediately since the caller is
3635                  * aware of the problem and takes care
3636                  * of the clean up */
3637                  RETURN(rc);
3638
3639         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3640             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3641                 GOTO(out, rc = 0);
3642
3643         if (rc != 0)
3644                 GOTO(out, rc);
3645
3646         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3647         if (msfs == NULL) {
3648                 GOTO(out, rc = -EPROTO);
3649         }
3650
3651         /* Reinitialize the RDONLY and DEGRADED flags at the client
3652          * on each statfs, so they don't stay set permanently. */
3653         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3654
3655         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3656                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3657         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3658                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3659
3660         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3661                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3662         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3663                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3664
3665         /* Add a bit of hysteresis so this flag isn't continually flapping,
3666          * and ensure that new files don't get extremely fragmented due to
3667          * only a small amount of available space in the filesystem.
3668          * We want to set the NOSPC flag when there is less than ~0.1% free
3669          * and clear it when there is at least ~0.2% free space, so:
3670          *                   avail < ~0.1% max          max = avail + used
3671          *            1025 * avail < avail + used       used = blocks - free
3672          *            1024 * avail < used
3673          *            1024 * avail < blocks - free
3674          *                   avail < ((blocks - free) >> 10)
3675          *
3676          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3677          * lose that amount of space so in those cases we report no space left
3678          * if their is less than 1 GB left.                             */
3679         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3680         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3681                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3682                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3683         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3684                           (msfs->os_ffree > 64) &&
3685                           (msfs->os_bavail > (used << 1)))) {
3686                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3687                                              OSCC_FLAG_NOSPC_BLK);
3688         }
3689
3690         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3691                      (msfs->os_bavail < used)))
3692                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3693
3694         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3695
3696         *aa->aa_oi->oi_osfs = *msfs;
3697 out:
3698         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3699         RETURN(rc);
3700 }
3701
3702 static int osc_statfs_async(struct obd_export *exp,
3703                             struct obd_info *oinfo, __u64 max_age,
3704                             struct ptlrpc_request_set *rqset)
3705 {
3706         struct obd_device     *obd = class_exp2obd(exp);
3707         struct ptlrpc_request *req;
3708         struct osc_async_args *aa;
3709         int                    rc;
3710         ENTRY;
3711
3712         /* We could possibly pass max_age in the request (as an absolute
3713          * timestamp or a "seconds.usec ago") so the target can avoid doing
3714          * extra calls into the filesystem if that isn't necessary (e.g.
3715          * during mount that would help a bit).  Having relative timestamps
3716          * is not so great if request processing is slow, while absolute
3717          * timestamps are not ideal because they need time synchronization. */
3718         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3719         if (req == NULL)
3720                 RETURN(-ENOMEM);
3721
3722         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3723         if (rc) {
3724                 ptlrpc_request_free(req);
3725                 RETURN(rc);
3726         }
3727         ptlrpc_request_set_replen(req);
3728         req->rq_request_portal = OST_CREATE_PORTAL;
3729         ptlrpc_at_set_req_timeout(req);
3730
3731         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3732                 /* procfs requests not want stat in wait for avoid deadlock */
3733                 req->rq_no_resend = 1;
3734                 req->rq_no_delay = 1;
3735         }
3736
3737         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3738         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3739         aa = ptlrpc_req_async_args(req);
3740         aa->aa_oi = oinfo;
3741
3742         ptlrpc_set_add_req(rqset, req);
3743         RETURN(0);
3744 }
3745
3746 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3747                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
3748 {
3749         struct obd_device     *obd = class_exp2obd(exp);
3750         struct obd_statfs     *msfs;
3751         struct ptlrpc_request *req;
3752         struct obd_import     *imp = NULL;
3753         int rc;
3754         ENTRY;
3755
3756         /*Since the request might also come from lprocfs, so we need
3757          *sync this with client_disconnect_export Bug15684*/
3758         cfs_down_read(&obd->u.cli.cl_sem);
3759         if (obd->u.cli.cl_import)
3760                 imp = class_import_get(obd->u.cli.cl_import);
3761         cfs_up_read(&obd->u.cli.cl_sem);
3762         if (!imp)
3763                 RETURN(-ENODEV);
3764
3765         /* We could possibly pass max_age in the request (as an absolute
3766          * timestamp or a "seconds.usec ago") so the target can avoid doing
3767          * extra calls into the filesystem if that isn't necessary (e.g.
3768          * during mount that would help a bit).  Having relative timestamps
3769          * is not so great if request processing is slow, while absolute
3770          * timestamps are not ideal because they need time synchronization. */
3771         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3772
3773         class_import_put(imp);
3774
3775         if (req == NULL)
3776                 RETURN(-ENOMEM);
3777
3778         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3779         if (rc) {
3780                 ptlrpc_request_free(req);
3781                 RETURN(rc);
3782         }
3783         ptlrpc_request_set_replen(req);
3784         req->rq_request_portal = OST_CREATE_PORTAL;
3785         ptlrpc_at_set_req_timeout(req);
3786
3787         if (flags & OBD_STATFS_NODELAY) {
3788                 /* procfs requests not want stat in wait for avoid deadlock */
3789                 req->rq_no_resend = 1;
3790                 req->rq_no_delay = 1;
3791         }
3792
3793         rc = ptlrpc_queue_wait(req);
3794         if (rc)
3795                 GOTO(out, rc);
3796
3797         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3798         if (msfs == NULL) {
3799                 GOTO(out, rc = -EPROTO);
3800         }
3801
3802         *osfs = *msfs;
3803
3804         EXIT;
3805  out:
3806         ptlrpc_req_finished(req);
3807         return rc;
3808 }
3809
3810 /* Retrieve object striping information.
3811  *
3812  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3813  * the maximum number of OST indices which will fit in the user buffer.
3814  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3815  */
3816 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3817 {
3818         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3819         struct lov_user_md_v3 lum, *lumk;
3820         struct lov_user_ost_data_v1 *lmm_objects;
3821         int rc = 0, lum_size;
3822         ENTRY;
3823
3824         if (!lsm)
3825                 RETURN(-ENODATA);
3826
3827         /* we only need the header part from user space to get lmm_magic and
3828          * lmm_stripe_count, (the header part is common to v1 and v3) */
3829         lum_size = sizeof(struct lov_user_md_v1);
3830         if (cfs_copy_from_user(&lum, lump, lum_size))
3831                 RETURN(-EFAULT);
3832
3833         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3834             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3835                 RETURN(-EINVAL);
3836
3837         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3838         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3839         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3840         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3841
3842         /* we can use lov_mds_md_size() to compute lum_size
3843          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3844         if (lum.lmm_stripe_count > 0) {
3845                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3846                 OBD_ALLOC(lumk, lum_size);
3847                 if (!lumk)
3848                         RETURN(-ENOMEM);
3849
3850                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3851                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3852                 else
3853                         lmm_objects = &(lumk->lmm_objects[0]);
3854                 lmm_objects->l_object_id = lsm->lsm_object_id;
3855         } else {
3856                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3857                 lumk = &lum;
3858         }
3859
3860         lumk->lmm_object_id = lsm->lsm_object_id;
3861         lumk->lmm_object_seq = lsm->lsm_object_seq;
3862         lumk->lmm_stripe_count = 1;
3863
3864         if (cfs_copy_to_user(lump, lumk, lum_size))
3865                 rc = -EFAULT;
3866
3867         if (lumk != &lum)
3868                 OBD_FREE(lumk, lum_size);
3869
3870         RETURN(rc);
3871 }
3872
3873
3874 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3875                          void *karg, void *uarg)
3876 {
3877         struct obd_device *obd = exp->exp_obd;
3878         struct obd_ioctl_data *data = karg;
3879         int err = 0;
3880         ENTRY;
3881
3882         if (!cfs_try_module_get(THIS_MODULE)) {
3883                 CERROR("Can't get module. Is it alive?");
3884                 return -EINVAL;
3885         }
3886         switch (cmd) {
3887         case OBD_IOC_LOV_GET_CONFIG: {
3888                 char *buf;
3889                 struct lov_desc *desc;
3890                 struct obd_uuid uuid;
3891
3892                 buf = NULL;
3893                 len = 0;
3894                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3895                         GOTO(out, err = -EINVAL);
3896
3897                 data = (struct obd_ioctl_data *)buf;
3898
3899                 if (sizeof(*desc) > data->ioc_inllen1) {
3900                         obd_ioctl_freedata(buf, len);
3901                         GOTO(out, err = -EINVAL);
3902                 }
3903
3904                 if (data->ioc_inllen2 < sizeof(uuid)) {
3905                         obd_ioctl_freedata(buf, len);
3906                         GOTO(out, err = -EINVAL);
3907                 }
3908
3909                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3910                 desc->ld_tgt_count = 1;
3911                 desc->ld_active_tgt_count = 1;
3912                 desc->ld_default_stripe_count = 1;
3913                 desc->ld_default_stripe_size = 0;
3914                 desc->ld_default_stripe_offset = 0;
3915                 desc->ld_pattern = 0;
3916                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3917
3918                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3919
3920                 err = cfs_copy_to_user((void *)uarg, buf, len);
3921                 if (err)
3922                         err = -EFAULT;
3923                 obd_ioctl_freedata(buf, len);
3924                 GOTO(out, err);
3925         }
3926         case LL_IOC_LOV_SETSTRIPE:
3927                 err = obd_alloc_memmd(exp, karg);
3928                 if (err > 0)
3929                         err = 0;
3930                 GOTO(out, err);
3931         case LL_IOC_LOV_GETSTRIPE:
3932                 err = osc_getstripe(karg, uarg);
3933                 GOTO(out, err);
3934         case OBD_IOC_CLIENT_RECOVER:
3935                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3936                                             data->ioc_inlbuf1, 0);
3937                 if (err > 0)
3938                         err = 0;
3939                 GOTO(out, err);
3940         case IOC_OSC_SET_ACTIVE:
3941                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3942                                                data->ioc_offset);
3943                 GOTO(out, err);
3944         case OBD_IOC_POLL_QUOTACHECK:
3945                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3946                 GOTO(out, err);
3947         case OBD_IOC_PING_TARGET:
3948                 err = ptlrpc_obd_ping(obd);
3949                 GOTO(out, err);
3950         default:
3951                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3952                        cmd, cfs_curproc_comm());
3953                 GOTO(out, err = -ENOTTY);
3954         }
3955 out:
3956         cfs_module_put(THIS_MODULE);
3957         return err;
3958 }
3959
3960 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3961                         obd_count keylen, void *key, __u32 *vallen, void *val,
3962                         struct lov_stripe_md *lsm)
3963 {
3964         ENTRY;
3965         if (!vallen || !val)
3966                 RETURN(-EFAULT);
3967
3968         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3969                 __u32 *stripe = val;
3970                 *vallen = sizeof(*stripe);
3971                 *stripe = 0;
3972                 RETURN(0);
3973         } else if (KEY_IS(KEY_LAST_ID)) {
3974                 struct ptlrpc_request *req;
3975                 obd_id                *reply;
3976                 char                  *tmp;
3977                 int                    rc;
3978
3979                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3980                                            &RQF_OST_GET_INFO_LAST_ID);
3981                 if (req == NULL)
3982                         RETURN(-ENOMEM);
3983
3984                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3985                                      RCL_CLIENT, keylen);
3986                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3987                 if (rc) {
3988                         ptlrpc_request_free(req);
3989                         RETURN(rc);
3990                 }
3991
3992                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3993                 memcpy(tmp, key, keylen);
3994
3995                 req->rq_no_delay = req->rq_no_resend = 1;
3996                 ptlrpc_request_set_replen(req);
3997                 rc = ptlrpc_queue_wait(req);
3998                 if (rc)
3999                         GOTO(out, rc);
4000
4001                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
4002                 if (reply == NULL)
4003                         GOTO(out, rc = -EPROTO);
4004
4005                 *((obd_id *)val) = *reply;
4006         out:
4007                 ptlrpc_req_finished(req);
4008                 RETURN(rc);
4009         } else if (KEY_IS(KEY_FIEMAP)) {
4010                 struct ptlrpc_request *req;
4011                 struct ll_user_fiemap *reply;
4012                 char *tmp;
4013                 int rc;
4014
4015                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4016                                            &RQF_OST_GET_INFO_FIEMAP);
4017                 if (req == NULL)
4018                         RETURN(-ENOMEM);
4019
4020                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4021                                      RCL_CLIENT, keylen);
4022                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4023                                      RCL_CLIENT, *vallen);
4024                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4025                                      RCL_SERVER, *vallen);
4026
4027                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4028                 if (rc) {
4029                         ptlrpc_request_free(req);
4030                         RETURN(rc);
4031                 }
4032
4033                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4034                 memcpy(tmp, key, keylen);
4035                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4036                 memcpy(tmp, val, *vallen);
4037
4038                 ptlrpc_request_set_replen(req);
4039                 rc = ptlrpc_queue_wait(req);
4040                 if (rc)
4041                         GOTO(out1, rc);
4042
4043                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4044                 if (reply == NULL)
4045                         GOTO(out1, rc = -EPROTO);
4046
4047                 memcpy(val, reply, *vallen);
4048         out1:
4049                 ptlrpc_req_finished(req);
4050
4051                 RETURN(rc);
4052         }
4053
4054         RETURN(-EINVAL);
4055 }
4056
4057 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4058 {
4059         struct llog_ctxt *ctxt;
4060         int rc = 0;
4061         ENTRY;
4062
4063         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4064         if (ctxt) {
4065                 rc = llog_initiator_connect(ctxt);
4066                 llog_ctxt_put(ctxt);
4067         } else {
4068                 /* XXX return an error? skip setting below flags? */
4069         }
4070
4071         cfs_spin_lock(&imp->imp_lock);
4072         imp->imp_server_timeout = 1;
4073         imp->imp_pingable = 1;
4074         cfs_spin_unlock(&imp->imp_lock);
4075         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4076
4077         RETURN(rc);
4078 }
4079
4080 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4081                                           struct ptlrpc_request *req,
4082                                           void *aa, int rc)
4083 {
4084         ENTRY;
4085         if (rc != 0)
4086                 RETURN(rc);
4087
4088         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4089 }
4090
4091 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
4092                               obd_count keylen, void *key, obd_count vallen,
4093                               void *val, struct ptlrpc_request_set *set)
4094 {
4095         struct ptlrpc_request *req;
4096         struct obd_device     *obd = exp->exp_obd;
4097         struct obd_import     *imp = class_exp2cliimp(exp);
4098         char                  *tmp;
4099         int                    rc;
4100         ENTRY;
4101
4102         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4103
4104         if (KEY_IS(KEY_NEXT_ID)) {
4105                 obd_id new_val;
4106                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4107
4108                 if (vallen != sizeof(obd_id))
4109                         RETURN(-ERANGE);
4110                 if (val == NULL)
4111                         RETURN(-EINVAL);
4112
4113                 if (vallen != sizeof(obd_id))
4114                         RETURN(-EINVAL);
4115
4116                 /* avoid race between allocate new object and set next id
4117                  * from ll_sync thread */
4118                 cfs_spin_lock(&oscc->oscc_lock);
4119                 new_val = *((obd_id*)val) + 1;
4120                 if (new_val > oscc->oscc_next_id)
4121                         oscc->oscc_next_id = new_val;
4122                 cfs_spin_unlock(&oscc->oscc_lock);
4123                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4124                        exp->exp_obd->obd_name,
4125                        obd->u.cli.cl_oscc.oscc_next_id);
4126
4127                 RETURN(0);
4128         }
4129
4130         if (KEY_IS(KEY_CHECKSUM)) {
4131                 if (vallen != sizeof(int))
4132                         RETURN(-EINVAL);
4133                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4134                 RETURN(0);
4135         }
4136
4137         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4138                 sptlrpc_conf_client_adapt(obd);
4139                 RETURN(0);
4140         }
4141
4142         if (KEY_IS(KEY_FLUSH_CTX)) {
4143                 sptlrpc_import_flush_my_ctx(imp);
4144                 RETURN(0);
4145         }
4146
4147         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4148                 RETURN(-EINVAL);
4149
4150         /* We pass all other commands directly to OST. Since nobody calls osc
4151            methods directly and everybody is supposed to go through LOV, we
4152            assume lov checked invalid values for us.
4153            The only recognised values so far are evict_by_nid and mds_conn.
4154            Even if something bad goes through, we'd get a -EINVAL from OST
4155            anyway. */
4156
4157         if (KEY_IS(KEY_GRANT_SHRINK))
4158                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4159         else
4160                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4161
4162         if (req == NULL)
4163                 RETURN(-ENOMEM);
4164
4165         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4166                              RCL_CLIENT, keylen);
4167         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4168                              RCL_CLIENT, vallen);
4169         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4170         if (rc) {
4171                 ptlrpc_request_free(req);
4172                 RETURN(rc);
4173         }
4174
4175         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4176         memcpy(tmp, key, keylen);
4177         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4178         memcpy(tmp, val, vallen);
4179
4180         if (KEY_IS(KEY_MDS_CONN)) {
4181                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4182
4183                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4184                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4185                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4186                 req->rq_no_delay = req->rq_no_resend = 1;
4187                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4188         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4189                 struct osc_grant_args *aa;
4190                 struct obdo *oa;
4191
4192                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4193                 aa = ptlrpc_req_async_args(req);
4194                 OBDO_ALLOC(oa);
4195                 if (!oa) {
4196                         ptlrpc_req_finished(req);
4197                         RETURN(-ENOMEM);
4198                 }
4199                 *oa = ((struct ost_body *)val)->oa;
4200                 aa->aa_oa = oa;
4201                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4202         }
4203
4204         ptlrpc_request_set_replen(req);
4205         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4206                 LASSERT(set != NULL);
4207                 ptlrpc_set_add_req(set, req);
4208                 ptlrpc_check_set(NULL, set);
4209         } else
4210                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4211
4212         RETURN(0);
4213 }
4214
4215
4216 static struct llog_operations osc_size_repl_logops = {
4217         lop_cancel: llog_obd_repl_cancel
4218 };
4219
4220 static struct llog_operations osc_mds_ost_orig_logops;
4221
4222 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4223                            struct obd_device *tgt, struct llog_catid *catid)
4224 {
4225         int rc;
4226         ENTRY;
4227
4228         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4229                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4230         if (rc) {
4231                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4232                 GOTO(out, rc);
4233         }
4234
4235         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4236                         NULL, &osc_size_repl_logops);
4237         if (rc) {
4238                 struct llog_ctxt *ctxt =
4239                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4240                 if (ctxt)
4241                         llog_cleanup(ctxt);
4242                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4243         }
4244         GOTO(out, rc);
4245 out:
4246         if (rc) {
4247                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4248                        obd->obd_name, tgt->obd_name, catid, rc);
4249                 CERROR("logid "LPX64":0x%x\n",
4250                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4251         }
4252         return rc;
4253 }
4254
4255 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4256                          struct obd_device *disk_obd, int *index)
4257 {
4258         struct llog_catid catid;
4259         static char name[32] = CATLIST;
4260         int rc;
4261         ENTRY;
4262
4263         LASSERT(olg == &obd->obd_olg);
4264
4265         cfs_mutex_lock(&olg->olg_cat_processing);
4266         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4267         if (rc) {
4268                 CERROR("rc: %d\n", rc);
4269                 GOTO(out, rc);
4270         }
4271
4272         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4273                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4274                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4275
4276         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4277         if (rc) {
4278                 CERROR("rc: %d\n", rc);
4279                 GOTO(out, rc);
4280         }
4281
4282         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4283         if (rc) {
4284                 CERROR("rc: %d\n", rc);
4285                 GOTO(out, rc);
4286         }
4287
4288  out:
4289         cfs_mutex_unlock(&olg->olg_cat_processing);
4290
4291         return rc;
4292 }
4293
4294 static int osc_llog_finish(struct obd_device *obd, int count)
4295 {
4296         struct llog_ctxt *ctxt;
4297         int rc = 0, rc2 = 0;
4298         ENTRY;
4299
4300         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4301         if (ctxt)
4302                 rc = llog_cleanup(ctxt);
4303
4304         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4305         if (ctxt)
4306                 rc2 = llog_cleanup(ctxt);
4307         if (!rc)
4308                 rc = rc2;
4309
4310         RETURN(rc);
4311 }
4312
4313 static int osc_reconnect(const struct lu_env *env,
4314                          struct obd_export *exp, struct obd_device *obd,
4315                          struct obd_uuid *cluuid,
4316                          struct obd_connect_data *data,
4317                          void *localdata)
4318 {
4319         struct client_obd *cli = &obd->u.cli;
4320
4321         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4322                 long lost_grant;
4323
4324                 client_obd_list_lock(&cli->cl_loi_list_lock);
4325                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4326                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4327                 lost_grant = cli->cl_lost_grant;
4328                 cli->cl_lost_grant = 0;
4329                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4330
4331                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4332                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4333                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4334                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4335                        " ocd_grant: %d\n", data->ocd_connect_flags,
4336                        data->ocd_version, data->ocd_grant);
4337         }
4338
4339         RETURN(0);
4340 }
4341
4342 static int osc_disconnect(struct obd_export *exp)
4343 {
4344         struct obd_device *obd = class_exp2obd(exp);
4345         struct llog_ctxt  *ctxt;
4346         int rc;
4347
4348         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4349         if (ctxt) {
4350                 if (obd->u.cli.cl_conn_count == 1) {
4351                         /* Flush any remaining cancel messages out to the
4352                          * target */
4353                         llog_sync(ctxt, exp);
4354                 }
4355                 llog_ctxt_put(ctxt);
4356         } else {
4357                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4358                        obd);
4359         }
4360
4361         rc = client_disconnect_export(exp);
4362         /**
4363          * Initially we put del_shrink_grant before disconnect_export, but it
4364          * causes the following problem if setup (connect) and cleanup
4365          * (disconnect) are tangled together.
4366          *      connect p1                     disconnect p2
4367          *   ptlrpc_connect_import
4368          *     ...............               class_manual_cleanup
4369          *                                     osc_disconnect
4370          *                                     del_shrink_grant
4371          *   ptlrpc_connect_interrupt
4372          *     init_grant_shrink
4373          *   add this client to shrink list
4374          *                                      cleanup_osc
4375          * Bang! pinger trigger the shrink.
4376          * So the osc should be disconnected from the shrink list, after we
4377          * are sure the import has been destroyed. BUG18662
4378          */
4379         if (obd->u.cli.cl_import == NULL)
4380                 osc_del_shrink_grant(&obd->u.cli);
4381         return rc;
4382 }
4383
4384 static int osc_import_event(struct obd_device *obd,
4385                             struct obd_import *imp,
4386                             enum obd_import_event event)
4387 {
4388         struct client_obd *cli;
4389         int rc = 0;
4390
4391         ENTRY;
4392         LASSERT(imp->imp_obd == obd);
4393
4394         switch (event) {
4395         case IMP_EVENT_DISCON: {
4396                 /* Only do this on the MDS OSC's */
4397                 if (imp->imp_server_timeout) {
4398                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4399
4400                         cfs_spin_lock(&oscc->oscc_lock);
4401                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4402                         cfs_spin_unlock(&oscc->oscc_lock);
4403                 }
4404                 cli = &obd->u.cli;
4405                 client_obd_list_lock(&cli->cl_loi_list_lock);
4406                 cli->cl_avail_grant = 0;
4407                 cli->cl_lost_grant = 0;
4408                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4409                 break;
4410         }
4411         case IMP_EVENT_INACTIVE: {
4412                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4413                 break;
4414         }
4415         case IMP_EVENT_INVALIDATE: {
4416                 struct ldlm_namespace *ns = obd->obd_namespace;
4417                 struct lu_env         *env;
4418                 int                    refcheck;
4419
4420                 env = cl_env_get(&refcheck);
4421                 if (!IS_ERR(env)) {
4422                         /* Reset grants */
4423                         cli = &obd->u.cli;
4424                         client_obd_list_lock(&cli->cl_loi_list_lock);
4425                         /* all pages go to failing rpcs due to the invalid
4426                          * import */
4427                         osc_check_rpcs(env, cli);
4428                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4429
4430                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4431                         cl_env_put(env, &refcheck);
4432                 } else
4433                         rc = PTR_ERR(env);
4434                 break;
4435         }
4436         case IMP_EVENT_ACTIVE: {
4437                 /* Only do this on the MDS OSC's */
4438                 if (imp->imp_server_timeout) {
4439                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4440
4441                         cfs_spin_lock(&oscc->oscc_lock);
4442                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4443                                               OSCC_FLAG_NOSPC_BLK);
4444                         cfs_spin_unlock(&oscc->oscc_lock);
4445                 }
4446                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4447                 break;
4448         }
4449         case IMP_EVENT_OCD: {
4450                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4451
4452                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4453                         osc_init_grant(&obd->u.cli, ocd);
4454
4455                 /* See bug 7198 */
4456                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4457                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4458
4459                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4460                 break;
4461         }
4462         case IMP_EVENT_DEACTIVATE: {
4463                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4464                 break;
4465         }
4466         case IMP_EVENT_ACTIVATE: {
4467                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4468                 break;
4469         }
4470         default:
4471                 CERROR("Unknown import event %d\n", event);
4472                 LBUG();
4473         }
4474         RETURN(rc);
4475 }
4476
4477 /**
4478  * Determine whether the lock can be canceled before replaying the lock
4479  * during recovery, see bug16774 for detailed information.
4480  *
4481  * \retval zero the lock can't be canceled
4482  * \retval other ok to cancel
4483  */
4484 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4485 {
4486         check_res_locked(lock->l_resource);
4487
4488         /*
4489          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4490          *
4491          * XXX as a future improvement, we can also cancel unused write lock
4492          * if it doesn't have dirty data and active mmaps.
4493          */
4494         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4495             (lock->l_granted_mode == LCK_PR ||
4496              lock->l_granted_mode == LCK_CR) &&
4497             (osc_dlm_lock_pageref(lock) == 0))
4498                 RETURN(1);
4499
4500         RETURN(0);
4501 }
4502
4503 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4504 {
4505         struct client_obd *cli = &obd->u.cli;
4506         int rc;
4507         ENTRY;
4508
4509         ENTRY;
4510         rc = ptlrpcd_addref();
4511         if (rc)
4512                 RETURN(rc);
4513
4514         rc = client_obd_setup(obd, lcfg);
4515         if (rc == 0) {
4516                 void *handler;
4517                 handler = ptlrpcd_alloc_work(cli->cl_import,
4518                                              brw_queue_work, cli);
4519                 if (!IS_ERR(handler))
4520                         cli->cl_writeback_work = handler;
4521                 else
4522                         rc = PTR_ERR(handler);
4523         }
4524
4525         if (rc == 0) {
4526                 struct lprocfs_static_vars lvars = { 0 };
4527
4528                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4529                 lprocfs_osc_init_vars(&lvars);
4530                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4531                         lproc_osc_attach_seqstat(obd);
4532                         sptlrpc_lprocfs_cliobd_attach(obd);
4533                         ptlrpc_lprocfs_register_obd(obd);
4534                 }
4535
4536                 oscc_init(obd);
4537                 /* We need to allocate a few requests more, because
4538                    brw_interpret tries to create new requests before freeing
4539                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4540                    reserved, but I afraid that might be too much wasted RAM
4541                    in fact, so 2 is just my guess and still should work. */
4542                 cli->cl_import->imp_rq_pool =
4543                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4544                                             OST_MAXREQSIZE,
4545                                             ptlrpc_add_rqs_to_pool);
4546
4547                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4548
4549                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4550         }
4551
4552         if (rc)
4553                 ptlrpcd_decref();
4554         RETURN(rc);
4555 }
4556
4557 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4558 {
4559         int rc = 0;
4560         ENTRY;
4561
4562         switch (stage) {
4563         case OBD_CLEANUP_EARLY: {
4564                 struct obd_import *imp;
4565                 imp = obd->u.cli.cl_import;
4566                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4567                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4568                 ptlrpc_deactivate_import(imp);
4569                 cfs_spin_lock(&imp->imp_lock);
4570                 imp->imp_pingable = 0;
4571                 cfs_spin_unlock(&imp->imp_lock);
4572                 break;
4573         }
4574         case OBD_CLEANUP_EXPORTS: {
4575                 struct client_obd *cli = &obd->u.cli;
4576                 /* LU-464
4577                  * for echo client, export may be on zombie list, wait for
4578                  * zombie thread to cull it, because cli.cl_import will be
4579                  * cleared in client_disconnect_export():
4580                  *   class_export_destroy() -> obd_cleanup() ->
4581                  *   echo_device_free() -> echo_client_cleanup() ->
4582                  *   obd_disconnect() -> osc_disconnect() ->
4583                  *   client_disconnect_export()
4584                  */
4585                 obd_zombie_barrier();
4586                 if (cli->cl_writeback_work) {
4587                         ptlrpcd_destroy_work(cli->cl_writeback_work);
4588                         cli->cl_writeback_work = NULL;
4589                 }
4590                 obd_cleanup_client_import(obd);
4591                 ptlrpc_lprocfs_unregister_obd(obd);
4592                 lprocfs_obd_cleanup(obd);
4593                 rc = obd_llog_finish(obd, 0);
4594                 if (rc != 0)
4595                         CERROR("failed to cleanup llogging subsystems\n");
4596                 break;
4597                 }
4598         }
4599         RETURN(rc);
4600 }
4601
4602 int osc_cleanup(struct obd_device *obd)
4603 {
4604         int rc;
4605
4606         ENTRY;
4607
4608         /* free memory of osc quota cache */
4609         osc_quota_cleanup(obd);
4610
4611         rc = client_obd_cleanup(obd);
4612
4613         ptlrpcd_decref();
4614         RETURN(rc);
4615 }
4616
4617 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4618 {
4619         struct lprocfs_static_vars lvars = { 0 };
4620         int rc = 0;
4621
4622         lprocfs_osc_init_vars(&lvars);
4623
4624         switch (lcfg->lcfg_command) {
4625         default:
4626                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4627                                               lcfg, obd);
4628                 if (rc > 0)
4629                         rc = 0;
4630                 break;
4631         }
4632
4633         return(rc);
4634 }
4635
4636 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4637 {
4638         return osc_process_config_base(obd, buf);
4639 }
4640
4641 struct obd_ops osc_obd_ops = {
4642         .o_owner                = THIS_MODULE,
4643         .o_setup                = osc_setup,
4644         .o_precleanup           = osc_precleanup,
4645         .o_cleanup              = osc_cleanup,
4646         .o_add_conn             = client_import_add_conn,
4647         .o_del_conn             = client_import_del_conn,
4648         .o_connect              = client_connect_import,
4649         .o_reconnect            = osc_reconnect,
4650         .o_disconnect           = osc_disconnect,
4651         .o_statfs               = osc_statfs,
4652         .o_statfs_async         = osc_statfs_async,
4653         .o_packmd               = osc_packmd,
4654         .o_unpackmd             = osc_unpackmd,
4655         .o_precreate            = osc_precreate,
4656         .o_create               = osc_create,
4657         .o_create_async         = osc_create_async,
4658         .o_destroy              = osc_destroy,
4659         .o_getattr              = osc_getattr,
4660         .o_getattr_async        = osc_getattr_async,
4661         .o_setattr              = osc_setattr,
4662         .o_setattr_async        = osc_setattr_async,
4663         .o_brw                  = osc_brw,
4664         .o_punch                = osc_punch,
4665         .o_sync                 = osc_sync,
4666         .o_enqueue              = osc_enqueue,
4667         .o_change_cbdata        = osc_change_cbdata,
4668         .o_find_cbdata          = osc_find_cbdata,
4669         .o_cancel               = osc_cancel,
4670         .o_cancel_unused        = osc_cancel_unused,
4671         .o_iocontrol            = osc_iocontrol,
4672         .o_get_info             = osc_get_info,
4673         .o_set_info_async       = osc_set_info_async,
4674         .o_import_event         = osc_import_event,
4675         .o_llog_init            = osc_llog_init,
4676         .o_llog_finish          = osc_llog_finish,
4677         .o_process_config       = osc_process_config,
4678         .o_quotactl             = osc_quotactl,
4679         .o_quotacheck           = osc_quotacheck,
4680         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
4681 };
4682
4683 extern struct lu_kmem_descr osc_caches[];
4684 extern cfs_spinlock_t       osc_ast_guard;
4685 extern cfs_lock_class_key_t osc_ast_guard_class;
4686
4687 int __init osc_init(void)
4688 {
4689         struct lprocfs_static_vars lvars = { 0 };
4690         int rc;
4691         ENTRY;
4692
4693         /* print an address of _any_ initialized kernel symbol from this
4694          * module, to allow debugging with gdb that doesn't support data
4695          * symbols from modules.*/
4696         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4697
4698         rc = lu_kmem_init(osc_caches);
4699
4700         lprocfs_osc_init_vars(&lvars);
4701
4702         osc_quota_init();
4703         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4704                                  LUSTRE_OSC_NAME, &osc_device_type);
4705         if (rc) {
4706                 lu_kmem_fini(osc_caches);
4707                 RETURN(rc);
4708         }
4709
4710         cfs_spin_lock_init(&osc_ast_guard);
4711         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4712
4713         osc_mds_ost_orig_logops = llog_lvfs_ops;
4714         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4715         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4716         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4717         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4718
4719         RETURN(rc);
4720 }
4721
4722 #ifdef __KERNEL__
4723 static void /*__exit*/ osc_exit(void)
4724 {
4725         osc_quota_exit();
4726         class_unregister_type(LUSTRE_OSC_NAME);
4727         lu_kmem_fini(osc_caches);
4728 }
4729
4730 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4731 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4732 MODULE_LICENSE("GPL");
4733
4734 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4735 #endif