@@ -4,6 +4,8 @@ use std::str::FromStr;
4
4
use ndarray:: Zip ;
5
5
use pgrx:: iter:: { SetOfIterator , TableIterator } ;
6
6
use pgrx:: * ;
7
+ use pyo3:: prelude:: * ;
8
+ use pyo3:: types:: { IntoPyDict , PyDict } ;
7
9
8
10
#[ cfg( feature ="python" ) ]
9
11
use serde_json:: json;
@@ -632,6 +634,75 @@ pub fn transform_string(
632
634
}
633
635
}
634
636
637
+ struct TransformStreamIterator {
638
+ locals : Py < PyDict > ,
639
+ }
640
+
641
+ impl TransformStreamIterator {
642
+ fn new ( python_iter : Py < PyAny > ) ->Self {
643
+ let locals =Python :: with_gil ( |py| ->Result < Py < PyDict > , PyErr > {
644
+ Ok ( [ ( "python_iter" , python_iter) ] . into_py_dict ( py) . into ( ) )
645
+ } )
646
+ . map_err ( |e|error ! ( "{e}" ) )
647
+ . unwrap ( ) ;
648
+ Self { locals}
649
+ }
650
+ }
651
+
652
+ impl Iterator for TransformStreamIterator {
653
+ type Item =String ;
654
+ fn next ( & mut self ) ->Option < Self :: Item > {
655
+ // We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
656
+ Python :: with_gil ( |py| ->Result < Option < String > , PyErr > {
657
+ let code ="next(python_iter)" ;
658
+ let res: & PyAny = py. eval ( code, Some ( self . locals . as_ref ( py) ) , None ) ?;
659
+ if res. is_none ( ) {
660
+ Ok ( None )
661
+ } else {
662
+ let res: String = res. extract ( ) ?;
663
+ Ok ( Some ( res) )
664
+ }
665
+ } )
666
+ . map_err ( |e|error ! ( "{e}" ) )
667
+ . unwrap ( )
668
+ }
669
+ }
670
+
671
+ #[ cfg( all( feature ="python" , not( feature ="use_as_lib" ) ) ) ]
672
+ #[ pg_extern( immutable, parallel_safe, name ="transform_stream" ) ]
673
+ #[ allow( unused_variables) ] // cache is maintained for api compatibility
674
+ pub fn transform_stream_json (
675
+ task : JsonB ,
676
+ args : default ! ( JsonB , "'{}'" ) ,
677
+ input : default ! ( & str , "''" ) ,
678
+ cache : default ! ( bool , false ) ,
679
+ ) ->SetOfIterator < ' static , String > {
680
+ // We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
681
+ let python_iter =crate :: bindings:: transformers:: transform_stream ( & task. 0 , & args. 0 , input)
682
+ . map_err ( |e|error ! ( "{e}" ) )
683
+ . unwrap ( ) ;
684
+ let res =TransformStreamIterator :: new ( python_iter) ;
685
+ SetOfIterator :: new ( res)
686
+ }
687
+
688
+ #[ cfg( all( feature ="python" , not( feature ="use_as_lib" ) ) ) ]
689
+ #[ pg_extern( immutable, parallel_safe, name ="transform_stream" ) ]
690
+ #[ allow( unused_variables) ] // cache is maintained for api compatibility
691
+ pub fn transform_stream_string (
692
+ task : String ,
693
+ args : default ! ( JsonB , "'{}'" ) ,
694
+ input : default ! ( & str , "''" ) ,
695
+ cache : default ! ( bool , false ) ,
696
+ ) ->SetOfIterator < ' static , String > {
697
+ let task_json =json ! ( { "task" : task} ) ;
698
+ // We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
699
+ let python_iter =crate :: bindings:: transformers:: transform_stream ( & task_json, & args. 0 , input)
700
+ . map_err ( |e|error ! ( "{e}" ) )
701
+ . unwrap ( ) ;
702
+ let res =TransformStreamIterator :: new ( python_iter) ;
703
+ SetOfIterator :: new ( res)
704
+ }
705
+
635
706
#[ cfg( feature ="python" ) ]
636
707
#[ pg_extern( immutable, parallel_safe, name ="generate" ) ]
637
708
fn generate ( project_name : & str , inputs : & str , config : default ! ( JsonB , "'{}'" ) ) ->String {