1212*/
1313package io .kubernetes .client .util ;
1414
15- import com .google .gson .JsonParseException ;
1615import com .google .gson .annotations .SerializedName ;
1716import com .google .gson .reflect .TypeToken ;
17+ import com .google .gson .stream .JsonReader ;
1818import com .squareup .okhttp .Call ;
1919import com .squareup .okhttp .ResponseBody ;
2020import io .kubernetes .client .ApiClient ;
2121import io .kubernetes .client .ApiException ;
2222import io .kubernetes .client .JSON ;
2323import io .kubernetes .client .models .V1Status ;
2424import java .io .IOException ;
25+ import java .io .StringReader ;
2526import java .lang .reflect .Type ;
2627import java .util .Iterator ;
2728import org .slf4j .Logger ;
@@ -112,7 +113,7 @@ public static <T> Watch<T> createWatch(ApiClient client, Call call, Type watchTy
112113 }
113114 }
114115
115- private Watch (JSON json , ResponseBody body , Type watchType , Call call ) {
116+ protected Watch (JSON json , ResponseBody body , Type watchType , Call call ) {
116117 this .response = body ;
117118 this .watchType = watchType ;
118119 this .json = json ;
@@ -125,18 +126,60 @@ public Response<T> next() {
125126 if (line == null ) {
126127 throw new RuntimeException ("Null response from the server." );
127128 }
128- try {
129- return json .deserialize (line , watchType );
130- } catch (JsonParseException ex ) {
131- Type statusType = new TypeToken <Response <V1Status >>() {}.getType ();
132- Response <V1Status > status = json .deserialize (line , statusType );
133- return new Response <T >(status .type , status .object );
134- }
129+ return parseLine (line );
135130 } catch (IOException e ) {
136131 throw new RuntimeException ("IO Exception during next method." , e );
137132 }
138133 }
139134
135+ protected boolean isStatus (String line ) throws IOException {
136+ boolean found = false ;
137+ JsonReader reader = new JsonReader (new StringReader (line ));
138+ reader .beginObject ();
139+ // extract object data.
140+ while (reader .hasNext ()) {
141+ String name = reader .nextName ();
142+ if (name .equals ("object" )) {
143+ found = true ;
144+ break ;
145+ }
146+ reader .skipValue ();
147+ }
148+ if (!found ) {
149+ return false ;
150+ }
151+
152+ String kind = null ;
153+ String apiVersion = null ;
154+ reader .beginObject ();
155+ while (reader .hasNext ()) {
156+ String name = reader .nextName ();
157+ if (name .equals ("kind" )) {
158+ kind = reader .nextString ();
159+ } else if (name .equals ("apiVersion" )) {
160+ apiVersion = reader .nextString ();
161+ } else {
162+ reader .skipValue ();
163+ }
164+ if (apiVersion != null && kind != null ) {
165+ break ;
166+ }
167+ }
168+ if ("Status" .equals (kind ) && "v1" .equals (apiVersion )) {
169+ return true ;
170+ }
171+ return false ;
172+ }
173+
174+ protected Response <T > parseLine (String line ) throws IOException {
175+ if (!isStatus (line )) {
176+ return json .deserialize (line , watchType );
177+ }
178+ Type statusType = new TypeToken <Response <V1Status >>() {}.getType ();
179+ Response <V1Status > status = json .deserialize (line , statusType );
180+ return new Response <T >(status .type , status .object );
181+ }
182+
140183 public boolean hasNext () {
141184 try {
142185 return !response .source ().exhausted ();
0 commit comments