@@ -4,8 +4,6 @@ use std::str::FromStr;
44use ndarray:: Zip ;
55use pgrx:: iter:: { SetOfIterator , TableIterator } ;
66use pgrx:: * ;
7- use pyo3:: prelude:: * ;
8- use pyo3:: types:: { IntoPyDict , PyDict } ;
97
108#[ cfg( feature ="python" ) ]
119use serde_json:: json;
@@ -634,40 +632,6 @@ pub fn transform_string(
634632}
635633}
636634
637- struct TransformStreamIterator {
638- locals : Py < PyDict > ,
639- }
640-
641- impl TransformStreamIterator {
642- fn new ( python_iter : Py < PyAny > ) ->Self {
643- let locals =Python :: with_gil ( |py| ->Result < Py < PyDict > , PyErr > {
644- Ok ( [ ( "python_iter" , python_iter) ] . into_py_dict ( py) . into ( ) )
645- } )
646- . map_err ( |e|error ! ( "{e}" ) )
647- . unwrap ( ) ;
648- Self { locals}
649- }
650- }
651-
652- impl Iterator for TransformStreamIterator {
653- type Item =String ;
654- fn next ( & mut self ) ->Option < Self :: Item > {
655- // We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
656- Python :: with_gil ( |py| ->Result < Option < String > , PyErr > {
657- let code ="next(python_iter)" ;
658- let res: & PyAny = py. eval ( code, Some ( self . locals . as_ref ( py) ) , None ) ?;
659- if res. is_none ( ) {
660- Ok ( None )
661- } else {
662- let res: String = res. extract ( ) ?;
663- Ok ( Some ( res) )
664- }
665- } )
666- . map_err ( |e|error ! ( "{e}" ) )
667- . unwrap ( )
668- }
669- }
670-
671635#[ cfg( all( feature ="python" , not( feature ="use_as_lib" ) ) ) ]
672636#[ pg_extern( immutable, parallel_safe, name ="transform_stream" ) ]
673637#[ allow( unused_variables) ] // cache is maintained for api compatibility
@@ -678,11 +642,11 @@ pub fn transform_stream_json(
678642cache : default ! ( bool , false ) ,
679643) ->SetOfIterator < ' static , String > {
680644// We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
681- let python_iter =crate :: bindings :: transformers :: transform_stream ( & task . 0 , & args . 0 , input )
682- . map_err ( |e| error ! ( "{e}" ) )
683- . unwrap ( ) ;
684- let res = TransformStreamIterator :: new ( python_iter ) ;
685- SetOfIterator :: new ( res )
645+ let python_iter =
646+ crate :: bindings :: transformers :: transform_stream_iterator ( & task . 0 , & args . 0 , input )
647+ . map_err ( |e| error ! ( "{e}" ) )
648+ . unwrap ( ) ;
649+ SetOfIterator :: new ( python_iter )
686650}
687651
688652#[ cfg( all( feature ="python" , not( feature ="use_as_lib" ) ) ) ]
@@ -696,11 +660,11 @@ pub fn transform_stream_string(
696660) ->SetOfIterator < ' static , String > {
697661let task_json =json ! ( { "task" : task} ) ;
698662// We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
699- let python_iter =crate :: bindings :: transformers :: transform_stream ( & task_json , & args . 0 , input )
700- . map_err ( |e| error ! ( "{e}" ) )
701- . unwrap ( ) ;
702- let res = TransformStreamIterator :: new ( python_iter ) ;
703- SetOfIterator :: new ( res )
663+ let python_iter =
664+ crate :: bindings :: transformers :: transform_stream_iterator ( & task_json , & args . 0 , input )
665+ . map_err ( |e| error ! ( "{e}" ) )
666+ . unwrap ( ) ;
667+ SetOfIterator :: new ( python_iter )
704668}
705669
706670#[ cfg( feature ="python" ) ]