@@ -344,7 +344,7 @@ def get_key(item):
344344q_in .put (job )
345345
346346
347- def builtin_transform_data (topic :str ,payload :t .AnyStr )-> TdataType :
347+ def builtin_transform_data (topic :str ,payload :t .Union [ str , bytes ] )-> TdataType :
348348"""Return a dict with initial transformation data which is made
349349 available to all plugins"""
350350
@@ -397,11 +397,17 @@ def xform(function: str, orig_value: t.Any, transform_data: TdataType) -> t.Unio
397397return res
398398
399399
400- def decode_payload (section :str ,topic :str ,payload :t .AnyStr )-> TdataType :
400+ def decode_payload (section :str ,topic :str ,payload :t .Union [ str , bytes ] )-> TdataType :
401401"""
402402 Decode message payload through transformation machinery.
403403 """
404404
405+ if isinstance (payload ,bytes ):
406+ try :
407+ payload = payload .decode ("utf-8" )
408+ except Exception as ex :
409+ logger .debug (f"Decoding from UTF-8 failed:{ ex } . payload={ truncate (payload )} " )
410+
405411transform_data = builtin_transform_data (topic ,payload )
406412
407413topic_data = context .get_topic_data (section ,transform_data )
@@ -423,12 +429,9 @@ def decode_payload(section: str, topic: str, payload: t.AnyStr) -> TdataType:
423429# the JSON keys into item to pass to the plugin, and create the outgoing
424430# (i.e. transformed) message.
425431try :
426- if isinstance (payload ,bytes ):
427- outdata = payload .decode ("utf-8" )
428- else :
429- outdata = payload
430- outdata = outdata .rstrip ("\0 " )
431- payload_data = json .loads (outdata )
432+ if isinstance (payload ,str ):
433+ payload = payload .rstrip ("\0 " )
434+ payload_data = json .loads (payload )
432435transform_data .update (payload_data )
433436except Exception as ex :
434437logger .debug (f"Decoding JSON failed:{ ex } . payload={ truncate (payload )} " )