4242
4343# Signature bytes for each message type
4444INIT = b"\x01 " # 0000 0001 // INIT <user_agent>
45- ACK_FAILURE = b"\x0F " # 0000 1111 // ACK_FAILURE
45+ RESET = b"\x0F " # 0000 1111 // RESET
4646RUN = b"\x10 " # 0001 0000 // RUN <statement> <parameters>
4747DISCARD_ALL = b"\x2F " # 0010 1111 // DISCARD *
4848PULL_ALL = b"\x3F " # 0011 1111 // PULL *
5656
5757message_names = {
5858 INIT : "INIT" ,
59- ACK_FAILURE : "ACK_FAILURE " ,
59+ RESET : "RESET " ,
6060 RUN : "RUN" ,
6161 DISCARD_ALL : "DISCARD_ALL" ,
6262 PULL_ALL : "PULL_ALL" ,
@@ -200,12 +200,6 @@ def on_ignored(self, metadata=None):
200200 pass
201201
202202
203- class AckFailureResponse (Response ):
204-
205- def on_failure (self , metadata ):
206- raise ProtocolError ("Could not acknowledge failure" )
207-
208-
209203class Connection (object ):
210204 """ Server connection through which all protocol messages
211205 are sent and received. This class is designed for protocol
@@ -215,6 +209,7 @@ class Connection(object):
215209 """
216210
217211 def __init__ (self , sock , ** config ):
212+ self .defunct = False
218213 self .channel = ChunkChannel (sock )
219214 self .packer = Packer (self .channel )
220215 self .responses = deque ()
@@ -237,6 +232,10 @@ def on_failure(metadata):
237232
238233 def append (self , signature , fields = (), response = None ):
239234 """ Add a message to the outgoing queue.
235+
236+ :arg signature: the signature of the message
237+ :arg fields: the fields of the message as a tuple
238+ :arg response: a response object to handle callbacks
240239 """
241240 if __debug__ :
242241 log_info ("C: %s %s" , message_names [signature ], " " .join (map (repr , fields )))
@@ -247,6 +246,18 @@ def append(self, signature, fields=(), response=None):
247246 self .channel .flush (end_of_message = True )
248247 self .responses .append (response )
249248
249+ def append_reset (self ):
250+ """ Add a RESET message to the outgoing queue.
251+ """
252+
253+ def on_failure (metadata ):
254+ raise ProtocolError ("Reset failed" )
255+
256+ response = Response (self )
257+ response .on_failure = on_failure
258+
259+ self .append (RESET , response = response )
260+
250261 def send (self ):
251262 """ Send all queued messages to the server.
252263 """
@@ -257,8 +268,12 @@ def fetch_next(self):
257268 """
258269 raw = BytesIO ()
259270 unpack = Unpacker (raw ).unpack
260- raw .writelines (self .channel .chunk_reader ())
261-
271+ try :
272+ raw .writelines (self .channel .chunk_reader ())
273+ except ProtocolError :
274+ self .defunct = True
275+ self .close ()
276+ return
262277 # Unpack from the raw byte stream and call the relevant message handler(s)
263278 raw .seek (0 )
264279 response = self .responses [0 ]
@@ -276,7 +291,7 @@ def fetch_next(self):
276291 response .complete = True
277292 self .responses .popleft ()
278293 if signature == FAILURE :
279- self .append ( ACK_FAILURE , response = AckFailureResponse ( self ) )
294+ self .append_reset ( )
280295 raw .close ()
281296
282297 def close (self ):
0 commit comments